diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:40:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:40:59 +0000 |
commit | bc4e624732bd51c0dd1e9529cf228e8c23127732 (patch) | |
tree | d95dab8960e9d02d3b95f8653074ad2e54ca207c /cmd | |
parent | Initial commit. (diff) | |
download | icingadb-bc4e624732bd51c0dd1e9529cf228e8c23127732.tar.xz icingadb-bc4e624732bd51c0dd1e9529cf228e8c23127732.zip |
Adding upstream version 1.1.1.upstream/1.1.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/icingadb-migrate/cache.go | 298 | ||||
-rw-r--r-- | cmd/icingadb-migrate/convert.go | 865 | ||||
-rw-r--r-- | cmd/icingadb-migrate/embed/comment_query.sql | 11 | ||||
-rw-r--r-- | cmd/icingadb-migrate/embed/downtime_query.sql | 14 | ||||
-rw-r--r-- | cmd/icingadb-migrate/embed/event_time_cache_schema.sql | 15 | ||||
-rw-r--r-- | cmd/icingadb-migrate/embed/flapping_query.sql | 9 | ||||
-rw-r--r-- | cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql | 9 | ||||
-rw-r--r-- | cmd/icingadb-migrate/embed/notification_query.sql | 9 | ||||
-rw-r--r-- | cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql | 22 | ||||
-rw-r--r-- | cmd/icingadb-migrate/embed/state_query.sql | 9 | ||||
-rw-r--r-- | cmd/icingadb-migrate/main.go | 488 | ||||
-rw-r--r-- | cmd/icingadb-migrate/misc.go | 321 | ||||
-rw-r--r-- | cmd/icingadb/main.go | 403 |
13 files changed, 2473 insertions, 0 deletions
diff --git a/cmd/icingadb-migrate/cache.go b/cmd/icingadb-migrate/cache.go new file mode 100644 index 0000000..d1854c9 --- /dev/null +++ b/cmd/icingadb-migrate/cache.go @@ -0,0 +1,298 @@ +package main + +import ( + "database/sql" + _ "embed" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "math" + "strings" + "time" +) + +//go:embed embed/event_time_cache_schema.sql +var eventTimeCacheSchema string + +//go:embed embed/previous_hard_state_cache_schema.sql +var previousHardStateCacheSchema string + +// buildEventTimeCache rationale: +// +// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that. +// That would make the IDO reading even slower than the Icinga DB writing. +// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time +// and cache it into an SQLite database. Then steam from that database and the IDO. +// +// Similar for acknowledgements. (On non-recoverable errors the whole program exits.) +func buildEventTimeCache(ht *historyType, idoColumns []string) { + type row = struct { + Id uint64 + EventTime int64 + EventTimeUsec uint32 + EventIsStart uint8 + ObjectId uint64 + } + + chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) { + var checkpoint struct { + Cnt int64 + MaxId sql.NullInt64 + } + cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time") + + ht.bar.SetCurrent(checkpoint.Cnt * 2) + + // Stream source data... + sliceIdoHistory( + ht, + "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+ + // For actual migration icinga_objects will be joined anyway, + // so it makes no sense to take vanished objects into account. + " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ + ht.idoIdColumn+" <= :toid AND xh."+ + ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk", + nil, checkpoint.MaxId.Int64, // ... since we were interrupted: + func(idoRows []row) (checkpoint interface{}) { + for _, idoRow := range idoRows { + if idoRow.EventIsStart == 0 { + // Ack/flapping end event. Get the start event time: + var lst []struct { + EventTime int64 + EventTimeUsec uint32 + } + cacheSelect( + *tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?", + idoRow.ObjectId, + ) + + // If we have that, ... + if len(lst) > 0 { + // ... save the start event time for the actual migration: + cacheExec( + *tx, + "INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)", + idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec, + ) + + // This previously queried info isn't needed anymore. + cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId) + } + } else { + // Ack/flapping start event directly after another start event (per checkable). + // The old one won't have (but the new one will) an end event (which will need its time). + cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId) + + // An ack/flapping start event. The following end event (per checkable) will need its time. + cacheExec( + *tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)", + idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec, + ) + } + + commitPeriodically() + checkpoint = idoRow.Id + } + + ht.bar.IncrBy(len(idoRows)) + return + }, + ) + + // This never queried info isn't needed anymore. + cacheExec(*tx, "DELETE FROM last_start_time") + }) + + ht.bar.SetTotal(ht.bar.Current(), true) +} + +// buildPreviousHardStateCache rationale: +// +// Icinga DB's state_history#previous_hard_state would need a subquery. +// That make the IDO reading even slower than the Icinga DB writing. +// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state +// and cache it into an SQLite database. Then steam from that database and the IDO. +// +// Similar for notifications. (On non-recoverable errors the whole program exits.) +func buildPreviousHardStateCache(ht *historyType, idoColumns []string) { + type row = struct { + Id uint64 + ObjectId uint64 + LastHardState uint8 + } + + chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) { + var nextIds struct { + Cnt int64 + MinId sql.NullInt64 + } + cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids") + + var previousHardStateCnt int64 + cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state") + + var checkpoint int64 + if nextIds.MinId.Valid { // there are next_ids + checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending + } else { // there aren't any next_ids + // next_ids contains the most recently processed IDs and is only empty if... + if previousHardStateCnt == 0 { + // ... we didn't actually start yet... + checkpoint = math.MaxInt64 // start from the largest (possible) ID + } else { + // ... or we've already finished. + checkpoint = 0 // make following query no-op + } + } + + ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt) + + // We continue where we finished before. As we build the cache in reverse chronological order: + // 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds: + // a. Start migration after Icinga DB is up and running. + // b. Remove the cache before the next migration trial. + // 2. If the history gets cleaned up between two migration trials, + // the difference either just doesn't appear in the cache or - if already there - will be ignored later. + + // Stream source data... + sliceIdoHistory( + ht, + "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+ + // For actual migration icinga_objects will be joined anyway, + // so it makes no sense to take vanished objects into account. + " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ + ht.idoIdColumn+" <= :toid AND xh."+ + ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk", + nil, checkpoint, // ... since we were interrupted: + func(idoRows []row) (checkpoint interface{}) { + for _, idoRow := range idoRows { + var nhs []struct{ NextHardState uint8 } + cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId) + + if len(nhs) < 1 { // we just started (per checkable) + // At the moment (we're "travelling back in time") that's the checkable's hard state: + cacheExec( + *tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)", + idoRow.ObjectId, idoRow.LastHardState, + ) + + // But for the current time point the previous hard state isn't known, yet: + cacheExec( + *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)", + idoRow.Id, idoRow.ObjectId, + ) + } else if idoRow.LastHardState == nhs[0].NextHardState { + // The hard state didn't change yet (per checkable), + // so this time point also awaits the previous hard state. + cacheExec( + *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)", + idoRow.Id, idoRow.ObjectId, + ) + } else { // the hard state changed (per checkable) + // That past hard state is now available for the processed future time points: + cacheExec( + *tx, + "INSERT INTO previous_hard_state(history_id, previous_hard_state) "+ + "SELECT history_id, ? FROM next_ids WHERE object_id=?", + idoRow.LastHardState, idoRow.ObjectId, + ) + + // Now they have what they wanted: + cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId) + cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId) + + // That's done. + // Now do the same thing as in the "we just started" case above, for the same reason: + + cacheExec( + *tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)", + idoRow.ObjectId, idoRow.LastHardState, + ) + + cacheExec( + *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)", + idoRow.Id, idoRow.ObjectId, + ) + } + + commitPeriodically() + checkpoint = idoRow.Id + } + + ht.bar.IncrBy(len(idoRows)) + return + }, + ) + + // No past hard state is available for the processed future time points, assuming pending: + cacheExec( + *tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids", + ) + + // Now they should have what they wanted: + cacheExec(*tx, "DELETE FROM next_hard_state") + cacheExec(*tx, "DELETE FROM next_ids") + }) + + ht.bar.SetTotal(ht.bar.Current(), true) +} + +// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically() +// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.) +// (On non-recoverable errors the whole program exits.) +func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) { + logger := log.With("backend", "cache") + + tx, err := cache.Beginx() + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction")) + } + + const commitInterval = 5 * time.Minute + nextCommit := time.Now().Add(commitInterval) + + do(&tx, func() { // commitPeriodically + if now := time.Now(); now.After(nextCommit) { + if err := tx.Commit(); err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction")) + } + + var err error + + tx, err = cache.Beginx() + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction")) + } + + nextCommit = nextCommit.Add(commitInterval) + } + }) + + if err := tx.Commit(); err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction")) + } +} + +// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.) +func cacheGet(cache interface { + Get(dest interface{}, query string, args ...interface{}) error +}, dest interface{}, query string, args ...interface{}) { + if err := cache.Get(dest, query, args...); err != nil { + log.With("backend", "cache", "query", query, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } +} + +// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.) +func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) { + if err := cacheTx.Select(dest, query, args...); err != nil { + log.With("backend", "cache", "query", query, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } +} + +// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits. +func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) { + if _, err := cacheTx.Exec(dml, args...); err != nil { + log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } +} diff --git a/cmd/icingadb-migrate/convert.go b/cmd/icingadb-migrate/convert.go new file mode 100644 index 0000000..e14746e --- /dev/null +++ b/cmd/icingadb-migrate/convert.go @@ -0,0 +1,865 @@ +package main + +import ( + "database/sql" + _ "embed" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingadb/v1/history" + icingadbTypes "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "strconv" + "strings" + "time" +) + +//go:embed embed/comment_query.sql +var commentMigrationQuery string + +//go:embed embed/downtime_query.sql +var downtimeMigrationQuery string + +//go:embed embed/flapping_query.sql +var flappingMigrationQuery string + +//go:embed embed/notification_query.sql +var notificationMigrationQuery string + +//go:embed embed/state_query.sql +var stateMigrationQuery string + +type commentRow = struct { + CommenthistoryId uint64 + EntryTime sql.NullInt64 + EntryTimeUsec uint32 + EntryType uint8 + AuthorName string + CommentData string + IsPersistent uint8 + ExpirationTime int64 + DeletionTime int64 + DeletionTimeUsec uint32 + Name string + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertCommentRows( + env string, envId icingadbTypes.Binary, + _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow, +) (stages []icingaDbOutputStage, checkpoint any) { + var commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck []contracts.Entity + + for _, row := range idoRows { + checkpoint = row.CommenthistoryId + + if !row.EntryTime.Valid { + continue + } + + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + + switch row.EntryType { + case 1: // user + id := calcObjectId(env, row.Name) + entryTime := convertTime(row.EntryTime.Int64, row.EntryTimeUsec) + removeTime := convertTime(row.DeletionTime, row.DeletionTimeUsec) + expireTime := convertTime(row.ExpirationTime, 0) + + commentHistory = append(commentHistory, &history.CommentHistory{ + CommentHistoryEntity: history.CommentHistoryEntity{CommentId: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + CommentHistoryUpserter: history.CommentHistoryUpserter{ + RemoveTime: removeTime, + HasBeenRemoved: icingadbTypes.Bool{Bool: !removeTime.Time().IsZero(), Valid: true}, + }, + EntryTime: entryTime, + Author: row.AuthorName, + Comment: row.CommentData, + EntryType: icingadbTypes.CommentType(row.EntryType), + IsPersistent: icingadbTypes.Bool{Bool: row.IsPersistent != 0, Valid: true}, + IsSticky: icingadbTypes.Bool{Bool: false, Valid: true}, + ExpireTime: expireTime, + }) + + h1 := &history.HistoryComment{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "comment_add", row.Name})}, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "comment_add", + }, + CommentHistoryId: id, + EntryTime: entryTime, + } + + h1.EventTime.History = h1 + allHistoryComment = append(allHistoryComment, h1) + + if !removeTime.Time().IsZero() { // remove + h2 := &history.HistoryComment{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "comment_remove", row.Name})}, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "comment_remove", + }, + CommentHistoryId: id, + EntryTime: entryTime, + RemoveTime: removeTime, + ExpireTime: expireTime, + } + + h2.EventTime.History = h2 + allHistoryComment = append(allHistoryComment, h2) + } + case 4: // ack + name := row.Name1 + if row.Name2 != "" { + name += "!" + row.Name2 + } + + setTime := convertTime(row.EntryTime.Int64, row.EntryTimeUsec) + setTs := float64(setTime.Time().UnixMilli()) + clearTime := convertTime(row.DeletionTime, row.DeletionTimeUsec) + acknowledgementHistoryId := hashAny([]any{env, name, setTs}) + + acknowledgementHistory = append(acknowledgementHistory, &history.AcknowledgementHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: acknowledgementHistoryId}, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + AckHistoryUpserter: history.AckHistoryUpserter{ClearTime: clearTime}, + SetTime: setTime, + Author: icingadbTypes.String{ + NullString: sql.NullString{ + String: row.AuthorName, + Valid: true, + }, + }, + Comment: icingadbTypes.String{ + NullString: sql.NullString{ + String: row.CommentData, + Valid: true, + }, + }, + ExpireTime: convertTime(row.ExpirationTime, 0), + IsPersistent: icingadbTypes.Bool{ + Bool: row.IsPersistent != 0, + Valid: true, + }, + }) + + h1 := &history.HistoryAck{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{ + Id: hashAny([]any{env, "ack_set", name, setTs}), + }, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "ack_set", + }, + AcknowledgementHistoryId: acknowledgementHistoryId, + SetTime: setTime, + ClearTime: clearTime, + } + + h1.EventTime.History = h1 + allHistoryAck = append(allHistoryAck, h1) + + if !clearTime.Time().IsZero() { + h2 := &history.HistoryAck{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{ + Id: hashAny([]any{env, "ack_clear", name, setTs}), + }, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "ack_clear", + }, + AcknowledgementHistoryId: acknowledgementHistoryId, + SetTime: setTime, + ClearTime: clearTime, + } + + h2.EventTime.History = h2 + allHistoryAck = append(allHistoryAck, h2) + } + } + } + + stages = []icingaDbOutputStage{ + {insert: commentHistory}, + {insert: acknowledgementHistory}, + {insert: allHistoryComment}, + {insert: allHistoryAck}, + } + return +} + +type downtimeRow = struct { + DowntimehistoryId uint64 + EntryTime int64 + AuthorName string + CommentData string + IsFixed uint8 + Duration int64 + ScheduledStartTime sql.NullInt64 + ScheduledEndTime int64 + WasStarted uint8 + ActualStartTime int64 + ActualStartTimeUsec uint32 + ActualEndTime int64 + ActualEndTimeUsec uint32 + WasCancelled uint8 + TriggerTime int64 + Name string + ObjecttypeId uint8 + Name1 string + Name2 string + TriggeredBy string +} + +func convertDowntimeRows( + env string, envId icingadbTypes.Binary, + _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow, +) (stages []icingaDbOutputStage, checkpoint any) { + var downtimeHistory, allHistory, sla []contracts.Entity + + for _, row := range idoRows { + checkpoint = row.DowntimehistoryId + + if !row.ScheduledStartTime.Valid || row.WasStarted == 0 { + continue + } + + id := calcObjectId(env, row.Name) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + scheduledStart := convertTime(row.ScheduledStartTime.Int64, 0) + scheduledEnd := convertTime(row.ScheduledEndTime, 0) + triggerTime := convertTime(row.TriggerTime, 0) + actualStart := convertTime(row.ActualStartTime, row.ActualStartTimeUsec) + actualEnd := convertTime(row.ActualEndTime, row.ActualEndTimeUsec) + var startTime, endTime, cancelTime icingadbTypes.UnixMilli + + if scheduledEnd.Time().IsZero() { + scheduledEnd = icingadbTypes.UnixMilli(scheduledStart.Time().Add(time.Duration(row.Duration) * time.Second)) + } + + if actualStart.Time().IsZero() { + startTime = scheduledStart + } else { + startTime = actualStart + } + + if actualEnd.Time().IsZero() { + endTime = scheduledEnd + } else { + endTime = actualEnd + } + + if triggerTime.Time().IsZero() { + triggerTime = startTime + } + + if row.WasCancelled != 0 { + cancelTime = actualEnd + } + + downtimeHistory = append(downtimeHistory, &history.DowntimeHistory{ + DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + DowntimeHistoryUpserter: history.DowntimeHistoryUpserter{ + HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true}, + CancelTime: cancelTime, + }, + TriggeredById: calcObjectId(env, row.TriggeredBy), + EntryTime: convertTime(row.EntryTime, 0), + Author: row.AuthorName, + Comment: row.CommentData, + IsFlexible: icingadbTypes.Bool{Bool: row.IsFixed == 0, Valid: true}, + FlexibleDuration: uint64(row.Duration) * 1000, + ScheduledStartTime: scheduledStart, + ScheduledEndTime: scheduledEnd, + StartTime: startTime, + EndTime: endTime, + TriggerTime: triggerTime, + }) + + h1 := &history.HistoryDowntime{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "downtime_start", row.Name})}, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "downtime_start", + }, + DowntimeHistoryId: id, + StartTime: startTime, + } + + h1.EventTime.History = h1 + allHistory = append(allHistory, h1) + + if !actualEnd.Time().IsZero() { // remove + h2 := &history.HistoryDowntime{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "downtime_end", row.Name})}, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "downtime_end", + }, + DowntimeHistoryId: id, + StartTime: startTime, + CancelTime: cancelTime, + EndTime: endTime, + HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true}, + } + + h2.EventTime.History = h2 + allHistory = append(allHistory, h2) + } + + s := &history.SlaHistoryDowntime{ + DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + DowntimeStart: startTime, + HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true}, + CancelTime: cancelTime, + EndTime: endTime, + } + + s.DowntimeEnd.History = s + sla = append(sla, s) + } + + stages = []icingaDbOutputStage{ + {insert: downtimeHistory}, + {insert: allHistory}, + {insert: sla}, + } + return +} + +type flappingRow = struct { + FlappinghistoryId uint64 + EventTime sql.NullInt64 + EventTimeUsec uint32 + EventType uint16 + PercentStateChange sql.NullFloat64 + LowThreshold float64 + HighThreshold float64 + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertFlappingRows( + env string, envId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow, +) (stages []icingaDbOutputStage, checkpoint any) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + EventTime int64 + EventTimeUsec uint32 + } + selectCache( + &cached, "SELECT history_id, event_time, event_time_usec FROM end_start_time WHERE history_id BETWEEN ? AND ?", + idoRows[0].FlappinghistoryId, idoRows[len(idoRows)-1].FlappinghistoryId, + ) + + // Needed for start time (see below). + cachedById := make(map[uint64]icingadbTypes.UnixMilli, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = convertTime(c.EventTime, c.EventTimeUsec) + } + + var flappingHistory, flappingHistoryUpserts, allHistory []contracts.Entity + for _, row := range idoRows { + checkpoint = row.FlappinghistoryId + + if !row.EventTime.Valid { + continue + } + + ts := convertTime(row.EventTime.Int64, row.EventTimeUsec) + + // Needed for ID (see below). + var start icingadbTypes.UnixMilli + if row.EventType == 1001 { // end + var ok bool + start, ok = cachedById[row.FlappinghistoryId] + + if !ok { + continue + } + } else { + start = ts + } + + name := row.Name1 + if row.Name2 != "" { + name += "!" + row.Name2 + } + + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + startTime := float64(start.Time().UnixMilli()) + flappingHistoryId := hashAny([]interface{}{env, name, startTime}) + + if row.EventType == 1001 { // end + // The start counterpart should already have been inserted. + flappingHistoryUpserts = append(flappingHistoryUpserts, &history.FlappingHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: flappingHistoryId}, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + FlappingHistoryUpserter: history.FlappingHistoryUpserter{ + EndTime: ts, + PercentStateChangeEnd: icingadbTypes.Float{NullFloat64: row.PercentStateChange}, + FlappingThresholdLow: float32(row.LowThreshold), + FlappingThresholdHigh: float32(row.HighThreshold), + }, + StartTime: start, + }) + + h := &history.HistoryFlapping{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{ + Id: hashAny([]interface{}{env, "flapping_end", name, startTime}), + }, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "flapping_end", + }, + FlappingHistoryId: flappingHistoryId, + StartTime: start, + EndTime: ts, + } + + h.EventTime.History = h + allHistory = append(allHistory, h) + } else { + flappingHistory = append(flappingHistory, &history.FlappingHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: flappingHistoryId}, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + FlappingHistoryUpserter: history.FlappingHistoryUpserter{ + FlappingThresholdLow: float32(row.LowThreshold), + FlappingThresholdHigh: float32(row.HighThreshold), + }, + StartTime: start, + PercentStateChangeStart: icingadbTypes.Float{NullFloat64: row.PercentStateChange}, + }) + + h := &history.HistoryFlapping{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{ + Id: hashAny([]interface{}{env, "flapping_start", name, startTime}), + }, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "flapping_start", + }, + FlappingHistoryId: flappingHistoryId, + StartTime: start, + } + + h.EventTime.History = h + allHistory = append(allHistory, h) + } + } + + stages = []icingaDbOutputStage{ + {insert: flappingHistory}, + {upsert: flappingHistoryUpserts}, + {insert: allHistory}, + } + return +} + +type notificationRow = struct { + NotificationId uint64 + NotificationReason uint8 + EndTime sql.NullInt64 + EndTimeUsec uint32 + State uint8 + Output string + LongOutput sql.NullString + ContactsNotified uint16 + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertNotificationRows( + env string, envId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow, +) (stages []icingaDbOutputStage, checkpoint any) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + PreviousHardState uint8 + } + selectCache( + &cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?", + idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId, + ) + + cachedById := make(map[uint64]uint8, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = c.PreviousHardState + } + + var contacts []struct { + NotificationId uint64 + Name1 string + } + + { + var query = ido.Rebind( + "SELECT c.notification_id, o.name1 FROM icinga_contactnotifications c " + + "INNER JOIN icinga_objects o ON o.object_id=c.contact_object_id WHERE c.notification_id BETWEEN ? AND ?", + ) + + err := ido.Select(&contacts, query, idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId) + if err != nil { + log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + } + + contactsById := map[uint64]map[string]struct{}{} + for _, contact := range contacts { + perId, ok := contactsById[contact.NotificationId] + if !ok { + perId = map[string]struct{}{} + contactsById[contact.NotificationId] = perId + } + + perId[contact.Name1] = struct{}{} + } + + var notificationHistory, userNotificationHistory, allHistory []contracts.Entity + for _, row := range idoRows { + checkpoint = row.NotificationId + + if !row.EndTime.Valid { + continue + } + + previousHardState, ok := cachedById[row.NotificationId] + if !ok { + continue + } + + // The IDO tracks only sent notifications, but not notification config objects, nor even their names. + // We have to improvise. By the way we avoid unwanted collisions between synced and migrated data via "ID" + // instead of "HOST[!SERVICE]!NOTIFICATION" (ok as this name won't be parsed, but only hashed) and between + // migrated data itself via the history ID as object name, i.e. one "virtual object" per sent notification. + name := strconv.FormatUint(row.NotificationId, 10) + + nt := convertNotificationType(row.NotificationReason, row.State) + + ntEnum, err := nt.Value() + if err != nil { + continue + } + + ts := convertTime(row.EndTime.Int64, row.EndTimeUsec) + tsMilli := float64(ts.Time().UnixMilli()) + notificationHistoryId := hashAny([]interface{}{env, name, ntEnum, tsMilli}) + id := hashAny([]interface{}{env, "notification", name, ntEnum, tsMilli}) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + + text := row.Output + if row.LongOutput.Valid { + text += "\n\n" + row.LongOutput.String + } + + notificationHistory = append(notificationHistory, &history.NotificationHistory{ + HistoryTableEntity: history.HistoryTableEntity{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: notificationHistoryId}, + }, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + NotificationId: calcObjectId(env, name), + Type: nt, + SendTime: ts, + State: row.State, + PreviousHardState: previousHardState, + Text: icingadbTypes.String{ + NullString: sql.NullString{ + String: text, + Valid: true, + }, + }, + UsersNotified: row.ContactsNotified, + }) + + allHistory = append(allHistory, &history.HistoryNotification{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "notification", + }, + NotificationHistoryId: notificationHistoryId, + EventTime: ts, + }) + + for contact := range contactsById[row.NotificationId] { + userId := calcObjectId(env, contact) + + userNotificationHistory = append(userNotificationHistory, &history.UserNotificationHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{ + Id: utils.Checksum(append(append([]byte(nil), notificationHistoryId...), userId...)), + }, + }, + EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: envId}, + NotificationHistoryId: notificationHistoryId, + UserId: userId, + }) + } + } + + stages = []icingaDbOutputStage{ + {insert: notificationHistory}, + {insert: userNotificationHistory}, + {insert: allHistory}, + } + return +} + +// convertNotificationType maps IDO values[1] to Icinga DB ones[2]. +// +// [1]: https://github.com/Icinga/icinga2/blob/32c7f7730db154ba0dff5856a8985d125791c/lib/db_ido/dbevents.cpp#L1507-L1524 +// [2]: https://github.com/Icinga/icingadb/blob/8f31ac143875498797725adb9bfacf3d4/pkg/types/notification_type.go#L53-L61 +func convertNotificationType(notificationReason, state uint8) icingadbTypes.NotificationType { + switch notificationReason { + case 0: // state + if state == 0 { + return 64 // recovery + } else { + return 32 // problem + } + case 1: // acknowledgement + return 16 + case 2: // flapping start + return 128 + case 3: // flapping end + return 256 + case 5: // downtime start + return 1 + case 6: // downtime end + return 2 + case 7: // downtime removed + return 4 + case 8: // custom + return 8 + default: // bad notification type + return 0 + } +} + +type stateRow = struct { + StatehistoryId uint64 + StateTime sql.NullInt64 + StateTimeUsec uint32 + State uint8 + StateType uint8 + CurrentCheckAttempt uint16 + MaxCheckAttempts uint16 + LastState uint8 + LastHardState uint8 + Output sql.NullString + LongOutput sql.NullString + CheckSource sql.NullString + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertStateRows( + env string, envId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow, +) (stages []icingaDbOutputStage, checkpoint any) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + PreviousHardState uint8 + } + selectCache( + &cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?", + idoRows[0].StatehistoryId, idoRows[len(idoRows)-1].StatehistoryId, + ) + + cachedById := make(map[uint64]uint8, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = c.PreviousHardState + } + + var stateHistory, allHistory, sla []contracts.Entity + for _, row := range idoRows { + checkpoint = row.StatehistoryId + + if !row.StateTime.Valid { + continue + } + + previousHardState, ok := cachedById[row.StatehistoryId] + if !ok { + continue + } + + name := strings.Join([]string{row.Name1, row.Name2}, "!") + ts := convertTime(row.StateTime.Int64, row.StateTimeUsec) + tsMilli := float64(ts.Time().UnixMilli()) + stateHistoryId := hashAny([]interface{}{env, name, tsMilli}) + id := hashAny([]interface{}{env, "state_change", name, tsMilli}) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + + stateHistory = append(stateHistory, &history.StateHistory{ + HistoryTableEntity: history.HistoryTableEntity{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: stateHistoryId}, + }, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + EventTime: ts, + StateType: icingadbTypes.StateType(row.StateType), + SoftState: row.State, + HardState: row.LastHardState, + PreviousSoftState: row.LastState, + PreviousHardState: previousHardState, + CheckAttempt: uint8(row.CurrentCheckAttempt), + Output: icingadbTypes.String{NullString: row.Output}, + LongOutput: icingadbTypes.String{NullString: row.LongOutput}, + MaxCheckAttempts: uint32(row.MaxCheckAttempts), + CheckSource: icingadbTypes.String{NullString: row.CheckSource}, + }) + + allHistory = append(allHistory, &history.HistoryState{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "state_change", + }, + StateHistoryId: stateHistoryId, + EventTime: ts, + }) + + if icingadbTypes.StateType(row.StateType) == icingadbTypes.StateHard { + // only hard state changes are relevant for SLA history, discard all others + + sla = append(sla, &history.SlaHistoryState{ + HistoryTableEntity: history.HistoryTableEntity{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: stateHistoryId}, + }, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + EventTime: ts, + StateType: icingadbTypes.StateType(row.StateType), + HardState: row.LastHardState, + PreviousHardState: previousHardState, + }) + } + } + + stages = []icingaDbOutputStage{ + {insert: stateHistory}, + {insert: allHistory}, + {insert: sla}, + } + return +} diff --git a/cmd/icingadb-migrate/embed/comment_query.sql b/cmd/icingadb-migrate/embed/comment_query.sql new file mode 100644 index 0000000..774ccf9 --- /dev/null +++ b/cmd/icingadb-migrate/embed/comment_query.sql @@ -0,0 +1,11 @@ +SELECT ch.commenthistory_id, UNIX_TIMESTAMP(ch.entry_time) entry_time, + ch.entry_time_usec, ch.entry_type, ch.author_name, ch.comment_data, ch.is_persistent, + COALESCE(UNIX_TIMESTAMP(ch.expiration_time), 0) expiration_time, + COALESCE(UNIX_TIMESTAMP(ch.deletion_time), 0) deletion_time, + ch.deletion_time_usec, ch.name, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2 +FROM icinga_commenthistory ch USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=ch.object_id +WHERE ch.commenthistory_id BETWEEN :fromid AND :toid +AND ch.commenthistory_id > :checkpoint -- where we were interrupted +ORDER BY ch.commenthistory_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/embed/downtime_query.sql b/cmd/icingadb-migrate/embed/downtime_query.sql new file mode 100644 index 0000000..467dfca --- /dev/null +++ b/cmd/icingadb-migrate/embed/downtime_query.sql @@ -0,0 +1,14 @@ +SELECT dh.downtimehistory_id, UNIX_TIMESTAMP(dh.entry_time) entry_time, dh.author_name, dh.comment_data, + dh.is_fixed, dh.duration, UNIX_TIMESTAMP(dh.scheduled_start_time) scheduled_start_time, + COALESCE(UNIX_TIMESTAMP(dh.scheduled_end_time), 0) scheduled_end_time, dh.was_started, + COALESCE(UNIX_TIMESTAMP(dh.actual_start_time), 0) actual_start_time, dh.actual_start_time_usec, + COALESCE(UNIX_TIMESTAMP(dh.actual_end_time), 0) actual_end_time, dh.actual_end_time_usec, dh.was_cancelled, + COALESCE(UNIX_TIMESTAMP(dh.trigger_time), 0) trigger_time, dh.name, o.objecttype_id, + o.name1, COALESCE(o.name2, '') name2, COALESCE(sd.name, '') triggered_by +FROM icinga_downtimehistory dh USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=dh.object_id +LEFT JOIN icinga_scheduleddowntime sd ON sd.scheduleddowntime_id=dh.triggered_by_id +WHERE dh.downtimehistory_id BETWEEN :fromid AND :toid +AND dh.downtimehistory_id > :checkpoint -- where we were interrupted +ORDER BY dh.downtimehistory_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/embed/event_time_cache_schema.sql b/cmd/icingadb-migrate/embed/event_time_cache_schema.sql new file mode 100644 index 0000000..5940754 --- /dev/null +++ b/cmd/icingadb-migrate/embed/event_time_cache_schema.sql @@ -0,0 +1,15 @@ +PRAGMA main.auto_vacuum = 1; + +-- Icinga DB's flapping_history#start_time per flapping_end row (IDO's icinga_flappinghistory#flappinghistory_id). +CREATE TABLE IF NOT EXISTS end_start_time ( + history_id INT PRIMARY KEY, + event_time INT NOT NULL, + event_time_usec INT NOT NULL +); + +-- Helper table, the last start_time per icinga_statehistory#object_id. +CREATE TABLE IF NOT EXISTS last_start_time ( + object_id INT PRIMARY KEY, + event_time INT NOT NULL, + event_time_usec INT NOT NULL +); diff --git a/cmd/icingadb-migrate/embed/flapping_query.sql b/cmd/icingadb-migrate/embed/flapping_query.sql new file mode 100644 index 0000000..5e25bde --- /dev/null +++ b/cmd/icingadb-migrate/embed/flapping_query.sql @@ -0,0 +1,9 @@ +SELECT fh.flappinghistory_id, UNIX_TIMESTAMP(fh.event_time) event_time, + fh.event_time_usec, fh.event_type, fh.percent_state_change, fh.low_threshold, + fh.high_threshold, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2 +FROM icinga_flappinghistory fh USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=fh.object_id +WHERE fh.flappinghistory_id BETWEEN :fromid AND :toid +AND fh.flappinghistory_id > :checkpoint -- where we were interrupted +ORDER BY fh.flappinghistory_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql b/cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql new file mode 100644 index 0000000..54c1c00 --- /dev/null +++ b/cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS ido_migration_progress ( + environment_id CHAR(40) NOT NULL, -- Hex SHA1. Rationale: CHAR(40) is not RDBMS-specific + history_type VARCHAR(63) NOT NULL, + from_ts BIGINT NOT NULL, + to_ts BIGINT NOT NULL, + last_ido_id BIGINT NOT NULL, + + CONSTRAINT pk_ido_migration_progress PRIMARY KEY (environment_id, history_type, from_ts, to_ts) +); diff --git a/cmd/icingadb-migrate/embed/notification_query.sql b/cmd/icingadb-migrate/embed/notification_query.sql new file mode 100644 index 0000000..f963b2e --- /dev/null +++ b/cmd/icingadb-migrate/embed/notification_query.sql @@ -0,0 +1,9 @@ +SELECT n.notification_id, n.notification_reason, UNIX_TIMESTAMP(n.end_time) end_time, + n.end_time_usec, n.state, COALESCE(n.output, '') output, n.long_output, + n.contacts_notified, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2 +FROM icinga_notifications n USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=n.object_id +WHERE n.notification_id BETWEEN :fromid AND :toid +AND n.notification_id <= :cache_limit AND n.notification_id > :checkpoint -- where we were interrupted +ORDER BY n.notification_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql b/cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql new file mode 100644 index 0000000..315f22d --- /dev/null +++ b/cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql @@ -0,0 +1,22 @@ +PRAGMA main.auto_vacuum = 1; + +-- Icinga DB's state_history#previous_hard_state per IDO's icinga_statehistory#statehistory_id. +CREATE TABLE IF NOT EXISTS previous_hard_state ( + history_id INT PRIMARY KEY, + previous_hard_state INT NOT NULL +); + +-- Helper table, the current last_hard_state per icinga_statehistory#object_id. +CREATE TABLE IF NOT EXISTS next_hard_state ( + object_id INT PRIMARY KEY, + next_hard_state INT NOT NULL +); + +-- Helper table for stashing icinga_statehistory#statehistory_id until last_hard_state changes. +CREATE TABLE IF NOT EXISTS next_ids ( + object_id INT NOT NULL, + history_id INT NOT NULL +); + +CREATE INDEX IF NOT EXISTS next_ids_object_id ON next_ids (object_id); +CREATE INDEX IF NOT EXISTS next_ids_history_id ON next_ids (history_id); diff --git a/cmd/icingadb-migrate/embed/state_query.sql b/cmd/icingadb-migrate/embed/state_query.sql new file mode 100644 index 0000000..3e95d48 --- /dev/null +++ b/cmd/icingadb-migrate/embed/state_query.sql @@ -0,0 +1,9 @@ +SELECT sh.statehistory_id, UNIX_TIMESTAMP(sh.state_time) state_time, sh.state_time_usec, sh.state, + sh.state_type, sh.current_check_attempt, sh.max_check_attempts, sh.last_state, sh.last_hard_state, + sh.output, sh.long_output, sh.check_source, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2 +FROM icinga_statehistory sh USE INDEX (PRIMARY) +INNER JOIN icinga_objects o ON o.object_id=sh.object_id +WHERE sh.statehistory_id BETWEEN :fromid AND :toid +AND sh.statehistory_id <= :cache_limit AND sh.statehistory_id > :checkpoint -- where we were interrupted +ORDER BY sh.statehistory_id -- this way we know what has already been migrated from just the last row's ID +LIMIT :bulk diff --git a/cmd/icingadb-migrate/main.go b/cmd/icingadb-migrate/main.go new file mode 100644 index 0000000..c089e78 --- /dev/null +++ b/cmd/icingadb-migrate/main.go @@ -0,0 +1,488 @@ +package main + +import ( + "context" + "database/sql" + _ "embed" + "encoding/hex" + "fmt" + "github.com/creasty/defaults" + "github.com/goccy/go-yaml" + "github.com/icinga/icingadb/pkg/config" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/logging" + icingadbTypes "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/jessevdk/go-flags" + "github.com/jmoiron/sqlx" + "github.com/jmoiron/sqlx/reflectx" + _ "github.com/mattn/go-sqlite3" + "github.com/pkg/errors" + "github.com/vbauerster/mpb/v6" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "math" + "os" + "path" + "path/filepath" + "regexp" + "strings" + "time" +) + +// Flags defines the CLI flags. +type Flags struct { + // Config is the path to the config file. + Config string `short:"c" long:"config" description:"path to config file" required:"true"` + // Cache is a (not necessarily yet existing) directory for caching. + Cache string `short:"t" long:"cache" description:"path for caching" required:"true"` +} + +// Config defines the YAML config structure. +type Config struct { + IDO struct { + config.Database `yaml:"-,inline"` + From int32 `yaml:"from"` + To int32 `yaml:"to" default:"2147483647"` + } `yaml:"ido"` + IcingaDB config.Database `yaml:"icingadb"` + // Icinga2 specifies information the IDO doesn't provide. + Icinga2 struct { + // Env specifies the environment ID, hex. + Env string `yaml:"env"` + } `yaml:"icinga2"` +} + +// main validates the CLI, parses the config and migrates history from IDO to Icinga DB (see comments below). +// Most of the called functions exit the whole program by themselves on non-recoverable errors. +func main() { + f := &Flags{} + if _, err := flags.NewParser(f, flags.Default).Parse(); err != nil { + os.Exit(2) + } + + c, ex := parseConfig(f) + if c == nil { + os.Exit(ex) + } + + envId, err := hex.DecodeString(c.Icinga2.Env) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "bad env ID: %s\n", err.Error()) + os.Exit(2) + } + + defer func() { _ = log.Sync() }() + + log.Info("Starting IDO to Icinga DB history migration") + + ido, idb := connectAll(c) + + if err := idb.CheckSchema(context.Background()); err != nil { + log.Fatalf("%+v", err) + } + + // Start repeatable-read-isolated transactions (consistent SELECTs) + // not to have to care for IDO data changes during migration. + startIdoTx(ido) + + // Prepare the directory structure the following fillCache() will need later. + mkCache(f, c, idb.Mapper) + + log.Info("Computing progress") + + // Convert Config#IDO.From and .To to IDs to restrict data by PK. + computeIdRange(c) + + // computeProgress figures out which data has already been migrated + // not to start from the beginning every time in the following migrate(). + computeProgress(c, idb, envId) + + // On rationale read buildEventTimeCache() and buildPreviousHardStateCache() docs. + log.Info("Filling cache") + fillCache() + + log.Info("Actually migrating") + migrate(c, idb, envId) + + log.Info("Cleaning up cache") + cleanupCache(f) +} + +// parseConfig validates the f.Config file and returns the config and -1 or - on failure - nil and an exit code. +func parseConfig(f *Flags) (_ *Config, exit int) { + cf, err := os.Open(f.Config) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "can't open config file: %s\n", err.Error()) + return nil, 2 + } + defer func() { _ = cf.Close() }() + + c := &Config{} + if err := defaults.Set(c); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "can't set config defaults: %s\n", err.Error()) + return nil, 2 + } + + if err := yaml.NewDecoder(cf, yaml.DisallowUnknownField()).Decode(c); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "can't parse config file: %s\n", err.Error()) + return nil, 2 + } + + return c, -1 +} + +var nonWords = regexp.MustCompile(`\W+`) + +// mkCache ensures <f.Cache>/<history type>.sqlite3 files are present and contain their schema +// and initializes types[*].cache. (On non-recoverable errors the whole program exits.) +func mkCache(f *Flags, c *Config, mapper *reflectx.Mapper) { + log.Info("Preparing cache") + + if err := os.MkdirAll(f.Cache, 0700); err != nil { + log.With("dir", f.Cache).Fatalf("%+v", errors.Wrap(err, "can't create directory")) + } + + types.forEach(func(ht *historyType) { + if ht.cacheSchema == "" { + return + } + + file := path.Join(f.Cache, fmt.Sprintf( + "%s_%d-%d.sqlite3", nonWords.ReplaceAllLiteralString(ht.name, "_"), c.IDO.From, c.IDO.To, + )) + + var err error + + ht.cache, err = sqlx.Open("sqlite3", "file:"+file) + if err != nil { + log.With("file", file).Fatalf("%+v", errors.Wrap(err, "can't open SQLite database")) + } + + ht.cacheFile = file + ht.cache.Mapper = mapper + + if _, err := ht.cache.Exec(ht.cacheSchema); err != nil { + log.With("file", file, "ddl", ht.cacheSchema). + Fatalf("%+v", errors.Wrap(err, "can't import schema into SQLite database")) + } + }) +} + +// connectAll connects to ido and idb (Icinga DB) as c specifies. (On non-recoverable errors the whole program exits.) +func connectAll(c *Config) (ido, idb *icingadb.DB) { + log.Info("Connecting to databases") + eg, _ := errgroup.WithContext(context.Background()) + + eg.Go(func() error { + ido = connect("IDO", &c.IDO.Database) + return nil + }) + + eg.Go(func() error { + idb = connect("Icinga DB", &c.IcingaDB) + return nil + }) + + _ = eg.Wait() + return +} + +// connect connects to which DB as cfg specifies. (On non-recoverable errors the whole program exits.) +func connect(which string, cfg *config.Database) *icingadb.DB { + db, err := cfg.Open(logging.NewLogger(zap.NewNop().Sugar(), 20*time.Second)) + if err != nil { + log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database")) + } + + if err := db.Ping(); err != nil { + log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database")) + } + + return db +} + +// startIdoTx initializes types[*].snapshot with new repeatable-read-isolated ido transactions. +// (On non-recoverable errors the whole program exits.) +func startIdoTx(ido *icingadb.DB) { + types.forEach(func(ht *historyType) { + tx, err := ido.BeginTxx(context.Background(), &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) + if err != nil { + log.Fatalf("%+v", errors.Wrap(err, "can't begin snapshot transaction")) + } + + ht.snapshot = tx + }) +} + +// computeIdRange initializes types[*].fromId and types[*].toId. +// (On non-recoverable errors the whole program exits.) +func computeIdRange(c *Config) { + types.forEach(func(ht *historyType) { + getBorderId := func(id *uint64, timeColumns []string, compOperator string, borderTime int32, sortOrder string) { + deZeroFied := make([]string, 0, len(timeColumns)) + for _, column := range timeColumns { + deZeroFied = append(deZeroFied, fmt.Sprintf( + "CASE WHEN %[1]s < '1970-01-03 00:00:00' THEN NULL ELSE %[1]s END", column, + )) + } + + var timeExpr string + if len(deZeroFied) > 1 { + timeExpr = "COALESCE(" + strings.Join(deZeroFied, ",") + ")" + } else { + timeExpr = deZeroFied[0] + } + + query := ht.snapshot.Rebind( + "SELECT " + ht.idoIdColumn + " FROM " + ht.idoTable + " WHERE " + timeExpr + " " + compOperator + + " FROM_UNIXTIME(?) ORDER BY " + ht.idoIdColumn + " " + sortOrder + " LIMIT 1", + ) + + switch err := ht.snapshot.Get(id, query, borderTime); err { + case nil, sql.ErrNoRows: + default: + log.With("backend", "IDO", "query", query, "args", []any{borderTime}). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + } + + ht.fromId = math.MaxInt64 + + getBorderId(&ht.fromId, ht.idoEndColumns, ">=", c.IDO.From, "ASC") + getBorderId(&ht.toId, ht.idoStartColumns, "<=", c.IDO.To, "DESC") + }) +} + +//go:embed embed/ido_migration_progress_schema.sql +var idoMigrationProgressSchema string + +// computeProgress initializes types[*].lastId, types[*].total and types[*].done. +// (On non-recoverable errors the whole program exits.) +func computeProgress(c *Config, idb *icingadb.DB, envId []byte) { + if _, err := idb.Exec(idoMigrationProgressSchema); err != nil { + log.Fatalf("%+v", errors.Wrap(err, "can't create table ido_migration_progress")) + } + + envIdHex := hex.EncodeToString(envId) + types.forEach(func(ht *historyType) { + var query = idb.Rebind( + "SELECT last_ido_id FROM ido_migration_progress" + + " WHERE environment_id=? AND history_type=? AND from_ts=? AND to_ts=?", + ) + + args := []any{envIdHex, ht.name, c.IDO.From, c.IDO.To} + + if err := idb.Get(&ht.lastId, query, args...); err != nil && err != sql.ErrNoRows { + log.With("backend", "Icinga DB", "query", query, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + }) + + types.forEach(func(ht *historyType) { + if ht.cacheFiller != nil { + err := ht.snapshot.Get( + &ht.cacheTotal, + ht.snapshot.Rebind( + // For actual migration icinga_objects will be joined anyway, + // so it makes no sense to take vanished objects into account. + "SELECT COUNT(*) FROM "+ht.idoTable+ + " xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ht.idoIdColumn+" <= ?", + ), + ht.toId, + ) + if err != nil { + log.Fatalf("%+v", errors.Wrap(err, "can't count query")) + } + } + }) + + types.forEach(func(ht *historyType) { + var rows []struct { + Migrated uint8 + Cnt int64 + } + + err := ht.snapshot.Select( + &rows, + ht.snapshot.Rebind( + // For actual migration icinga_objects will be joined anyway, + // so it makes no sense to take vanished objects into account. + "SELECT CASE WHEN xh."+ht.idoIdColumn+"<=? THEN 1 ELSE 0 END migrated, COUNT(*) cnt FROM "+ + ht.idoTable+" xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ + ht.idoIdColumn+" BETWEEN ? AND ? GROUP BY migrated", + ), + ht.lastId, ht.fromId, ht.toId, + ) + if err != nil { + log.Fatalf("%+v", errors.Wrap(err, "can't count query")) + } + + for _, row := range rows { + ht.total += row.Cnt + + if row.Migrated == 1 { + ht.done = row.Cnt + } + } + + log.Infow("Counted migrated IDO events", "type", ht.name, "migrated", ht.done, "total", ht.total) + }) +} + +// fillCache fills <f.Cache>/<history type>.sqlite3 (actually types[*].cacheFiller does). +func fillCache() { + progress := mpb.New() + for _, ht := range types { + if ht.cacheFiller != nil { + ht.setupBar(progress, ht.cacheTotal) + } + } + + types.forEach(func(ht *historyType) { + if ht.cacheFiller != nil { + ht.cacheFiller(ht) + } + }) + + progress.Wait() +} + +// migrate does the actual migration. +func migrate(c *Config, idb *icingadb.DB, envId []byte) { + progress := mpb.New() + for _, ht := range types { + ht.setupBar(progress, ht.total) + } + + types.forEach(func(ht *historyType) { + ht.migrate(c, idb, envId, ht) + }) + + progress.Wait() +} + +// migrate does the actual migration for one history type. +func migrateOneType[IdoRow any]( + c *Config, idb *icingadb.DB, envId []byte, ht *historyType, + convertRows func(env string, envId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, + idoRows []IdoRow) (stages []icingaDbOutputStage, checkpoint any), +) { + var lastQuery string + var lastStmt *sqlx.Stmt + + defer func() { + if lastStmt != nil { + _ = lastStmt.Close() + } + }() + + selectCache := func(dest interface{}, query string, args ...interface{}) { + // Prepare new one, if old one doesn't fit anymore. + if query != lastQuery { + if lastStmt != nil { + _ = lastStmt.Close() + } + + var err error + + lastStmt, err = ht.cache.Preparex(query) + if err != nil { + log.With("backend", "cache", "query", query). + Fatalf("%+v", errors.Wrap(err, "can't prepare query")) + } + + lastQuery = query + } + + if err := lastStmt.Select(dest, args...); err != nil { + log.With("backend", "cache", "query", query, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + } + + var args map[string]interface{} + + // For the case that the cache was older that the IDO, + // but ht.cacheFiller couldn't update it, limit (WHERE) our source data set. + if ht.cacheLimitQuery != "" { + var limit sql.NullInt64 + cacheGet(ht.cache, &limit, ht.cacheLimitQuery) + args = map[string]interface{}{"cache_limit": limit.Int64} + } + + upsertProgress, _ := idb.BuildUpsertStmt(&IdoMigrationProgress{}) + envIdHex := hex.EncodeToString(envId) + + ht.bar.SetCurrent(ht.done) + + // Stream IDO rows, ... + sliceIdoHistory( + ht, ht.migrationQuery, args, ht.lastId, + func(idoRows []IdoRow) (checkpoint interface{}) { + // ... convert them, ... + stages, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows) + + // ... and insert them: + + for _, stage := range stages { + if len(stage.insert) > 0 { + ch := utils.ChanFromSlice(stage.insert) + + if err := idb.CreateIgnoreStreamed(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", "INSERT IGNORE", "table", utils.TableName(stage.insert[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + + if len(stage.upsert) > 0 { + ch := utils.ChanFromSlice(stage.upsert) + + if err := idb.UpsertStreamed(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", "UPSERT", "table", utils.TableName(stage.upsert[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + } + + if lastIdoId != nil { + args := map[string]interface{}{"history_type": ht.name, "last_ido_id": lastIdoId} + + _, err := idb.NamedExec(upsertProgress, &IdoMigrationProgress{ + IdoMigrationProgressUpserter{lastIdoId}, envIdHex, ht.name, c.IDO.From, c.IDO.To, + }) + if err != nil { + log.With("backend", "Icinga DB", "dml", upsertProgress, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + + ht.bar.IncrBy(len(idoRows)) + return lastIdoId + }, + ) + + ht.bar.SetTotal(ht.bar.Current(), true) +} + +// cleanupCache removes <f.Cache>/<history type>.sqlite3 files. +func cleanupCache(f *Flags) { + types.forEach(func(ht *historyType) { + if ht.cacheFile != "" { + if err := ht.cache.Close(); err != nil { + log.With("file", ht.cacheFile).Warnf("%+v", errors.Wrap(err, "can't close SQLite database")) + } + } + }) + + if matches, err := filepath.Glob(path.Join(f.Cache, "*.sqlite3")); err == nil { + for _, match := range matches { + if err := os.Remove(match); err != nil { + log.With("file", match).Warnf("%+v", errors.Wrap(err, "can't remove SQLite database")) + } + } + } else { + log.With("dir", f.Cache).Warnf("%+v", errors.Wrap(err, "can't list SQLite databases")) + } +} diff --git a/cmd/icingadb-migrate/misc.go b/cmd/icingadb-migrate/misc.go new file mode 100644 index 0000000..f1db20c --- /dev/null +++ b/cmd/icingadb-migrate/misc.go @@ -0,0 +1,321 @@ +package main + +import ( + "context" + "crypto/sha1" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/driver" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/icingadb/objectpacker" + icingadbTypes "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "github.com/vbauerster/mpb/v6" + "github.com/vbauerster/mpb/v6/decor" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "strings" + "time" +) + +type IdoMigrationProgressUpserter struct { + LastIdoId any `json:"last_ido_id"` +} + +// Upsert implements the contracts.Upserter interface. +func (impu *IdoMigrationProgressUpserter) Upsert() interface{} { + return impu +} + +type IdoMigrationProgress struct { + IdoMigrationProgressUpserter `json:",inline"` + EnvironmentId string `json:"environment_id"` + HistoryType string `json:"history_type"` + FromTs int32 `json:"from_ts"` + ToTs int32 `json:"to_ts"` +} + +// Assert interface compliance. +var ( + _ contracts.Upserter = (*IdoMigrationProgressUpserter)(nil) + _ contracts.Upserter = (*IdoMigrationProgress)(nil) +) + +// log is the root logger. +var log = func() *zap.SugaredLogger { + logger, err := zap.NewDevelopmentConfig().Build() + if err != nil { + panic(err) + } + + return logger.Sugar() +}() + +// objectTypes maps IDO values to Icinga DB ones. +var objectTypes = map[uint8]string{1: "host", 2: "service"} + +// hashAny combines objectpacker.PackAny and SHA1 hashing. +func hashAny(in interface{}) []byte { + hash := sha1.New() + if err := objectpacker.PackAny(in, hash); err != nil { + panic(err) + } + + return hash.Sum(nil) +} + +// convertTime converts *nix timestamps from the IDO for Icinga DB. +func convertTime(ts int64, tsUs uint32) icingadbTypes.UnixMilli { + if ts == 0 && tsUs == 0 { + return icingadbTypes.UnixMilli{} + } + + return icingadbTypes.UnixMilli(time.Unix(ts, int64(tsUs)*int64(time.Microsecond/time.Nanosecond))) +} + +// calcObjectId calculates the ID of the config object named name1 for Icinga DB. +func calcObjectId(env, name1 string) []byte { + if name1 == "" { + return nil + } + + return hashAny([2]string{env, name1}) +} + +// calcServiceId calculates the ID of the service name2 of the host name1 for Icinga DB. +func calcServiceId(env, name1, name2 string) []byte { + if name2 == "" { + return nil + } + + return hashAny([2]string{env, name1 + "!" + name2}) +} + +// sliceIdoHistory performs query with args+fromid,toid,checkpoint,bulk on ht.snapshot +// and passes the results to onRows until either an empty result set or onRows() returns nil. +// Rationale: split the likely large result set of a query by adding a WHERE condition and a LIMIT, +// both with :named placeholders (:checkpoint, :bulk). +// checkpoint is the initial value for the WHERE condition, onRows() returns follow-up ones. +// (On non-recoverable errors the whole program exits.) +func sliceIdoHistory[Row any]( + ht *historyType, query string, args map[string]any, + checkpoint interface{}, onRows func([]Row) (checkpoint interface{}), +) { + if args == nil { + args = map[string]interface{}{} + } + + args["fromid"] = ht.fromId + args["toid"] = ht.toId + args["checkpoint"] = checkpoint + args["bulk"] = 20000 + + if ht.snapshot.DriverName() != driver.MySQL { + query = strings.ReplaceAll(query, " USE INDEX (PRIMARY)", "") + } + + for { + // TODO: use Tx#SelectNamed() one nice day (https://github.com/jmoiron/sqlx/issues/779) + stmt, err := ht.snapshot.PrepareNamed(query) + if err != nil { + log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't prepare query")) + } + + var rows []Row + if err := stmt.Select(&rows, args); err != nil { + log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + + _ = stmt.Close() + + if len(rows) < 1 { + break + } + + if checkpoint = onRows(rows); checkpoint == nil { + break + } + + args["checkpoint"] = checkpoint + } +} + +type progressBar struct { + *mpb.Bar + + lastUpdate time.Time +} + +// IncrBy does pb.Bar.DecoratorEwmaUpdate() automatically. +func (pb *progressBar) IncrBy(n int) { + pb.Bar.IncrBy(n) + + now := time.Now() + + if !pb.lastUpdate.IsZero() { + pb.Bar.DecoratorEwmaUpdate(now.Sub(pb.lastUpdate)) + } + + pb.lastUpdate = now +} + +// historyType specifies a history data type. +type historyType struct { + // name is a human-readable common name. + name string + // idoTable specifies the source table. + idoTable string + // idoIdColumn specifies idoTable's primary key. + idoIdColumn string + // idoStartColumns specifies idoTable's event start time locations. (First non-NULL is used.) + idoStartColumns []string + // idoEndColumns specifies idoTable's event end time locations. (First non-NULL is used.) + idoEndColumns []string + // cacheSchema specifies <name>.sqlite3's structure. + cacheSchema string + // cacheFiller fills cache from snapshot. + cacheFiller func(*historyType) + // cacheLimitQuery rationale: see migrate(). + cacheLimitQuery string + // migrationQuery SELECTs source data for actual migration. + migrationQuery string + // migrate does the actual migration. + migrate func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) + + // cacheFile locates <name>.sqlite3. + cacheFile string + // cache represents <cacheFile>. + cache *sqlx.DB + // snapshot represents the data source. + snapshot *sqlx.Tx + // fromId is the first IDO row ID to migrate. + fromId uint64 + // toId is the last IDO row ID to migrate. + toId uint64 + // total summarizes the source data. + total int64 + // cacheTotal summarizes the cache source data. + cacheTotal int64 + // done summarizes the migrated data. + done int64 + // bar represents the current progress bar. + bar *progressBar + // lastId is the last already migrated ID. + lastId uint64 +} + +// setupBar (re-)initializes ht.bar. +func (ht *historyType) setupBar(progress *mpb.Progress, total int64) { + ht.bar = &progressBar{Bar: progress.AddBar( + total, + mpb.BarFillerClearOnComplete(), + mpb.PrependDecorators( + decor.Name(ht.name, decor.WC{W: len(ht.name) + 1, C: decor.DidentRight}), + decor.Percentage(decor.WC{W: 5}), + ), + mpb.AppendDecorators( + decor.EwmaETA(decor.ET_STYLE_GO, 0, decor.WC{W: 4}), + decor.Name(" "), + decor.EwmaSpeed(0, "%.0f/s", 0, decor.WC{W: 4}), + ), + )} +} + +type historyTypes []*historyType + +// forEach performs f per hts in parallel. +func (hts historyTypes) forEach(f func(*historyType)) { + eg, _ := errgroup.WithContext(context.Background()) + for _, ht := range hts { + ht := ht + eg.Go(func() error { + f(ht) + return nil + }) + } + + _ = eg.Wait() +} + +type icingaDbOutputStage struct { + insert, upsert []contracts.Entity +} + +var types = historyTypes{ + { + name: "ack & comment", + idoTable: "icinga_commenthistory", + idoIdColumn: "commenthistory_id", + idoStartColumns: []string{"entry_time"}, + // Manual deletion time wins vs. time of expiration which never happens due to manual deletion. + idoEndColumns: []string{"deletion_time", "expiration_time"}, + migrationQuery: commentMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) { + migrateOneType(c, idb, envId, ht, convertCommentRows) + }, + }, + { + name: "downtime", + idoTable: "icinga_downtimehistory", + idoIdColumn: "downtimehistory_id", + // Fall back to scheduled time if actual time is missing. + idoStartColumns: []string{"actual_start_time", "scheduled_start_time"}, + idoEndColumns: []string{"actual_end_time", "scheduled_end_time"}, + migrationQuery: downtimeMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) { + migrateOneType(c, idb, envId, ht, convertDowntimeRows) + }, + }, + { + name: "flapping", + idoTable: "icinga_flappinghistory", + idoIdColumn: "flappinghistory_id", + idoStartColumns: []string{"event_time"}, + idoEndColumns: []string{"event_time"}, + cacheSchema: eventTimeCacheSchema, + cacheFiller: func(ht *historyType) { + buildEventTimeCache(ht, []string{ + "xh.flappinghistory_id id", "UNIX_TIMESTAMP(xh.event_time) event_time", + "xh.event_time_usec", "1001-xh.event_type event_is_start", "xh.object_id", + }) + }, + migrationQuery: flappingMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) { + migrateOneType(c, idb, envId, ht, convertFlappingRows) + }, + }, + { + name: "notification", + idoTable: "icinga_notifications", + idoIdColumn: "notification_id", + idoStartColumns: []string{"start_time"}, + idoEndColumns: []string{"end_time"}, + cacheSchema: previousHardStateCacheSchema, + cacheFiller: func(ht *historyType) { + buildPreviousHardStateCache(ht, []string{ + "xh.notification_id id", "xh.object_id", "xh.state last_hard_state", + }) + }, + cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state", + migrationQuery: notificationMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) { + migrateOneType(c, idb, envId, ht, convertNotificationRows) + }, + }, + { + name: "state", + idoTable: "icinga_statehistory", + idoIdColumn: "statehistory_id", + idoStartColumns: []string{"state_time"}, + idoEndColumns: []string{"state_time"}, + cacheSchema: previousHardStateCacheSchema, + cacheFiller: func(ht *historyType) { + buildPreviousHardStateCache(ht, []string{"xh.statehistory_id id", "xh.object_id", "xh.last_hard_state"}) + }, + cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state", + migrationQuery: stateMigrationQuery, + migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) { + migrateOneType(c, idb, envId, ht, convertStateRows) + }, + }, +} diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go new file mode 100644 index 0000000..77ce577 --- /dev/null +++ b/cmd/icingadb/main.go @@ -0,0 +1,403 @@ +package main + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal/command" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/icingadb/history" + "github.com/icinga/icingadb/pkg/icingadb/overdue" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/utils" + "github.com/okzk/sdnotify" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "os" + "os/signal" + "sync" + "sync/atomic" + "syscall" + "time" +) + +const ( + ExitSuccess = 0 + ExitFailure = 1 + expectedRedisSchemaVersion = "5" +) + +func main() { + os.Exit(run()) +} + +func run() int { + cmd := command.New() + logs, err := logging.NewLogging( + utils.AppName(), + cmd.Config.Logging.Level, + cmd.Config.Logging.Output, + cmd.Config.Logging.Options, + cmd.Config.Logging.Interval, + ) + if err != nil { + utils.Fatal(errors.Wrap(err, "can't configure logging")) + } + // When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, which is the + // default setting for the Icinga DB service. So we notify that Icinga DB finished starting up. + _ = sdnotify.Ready() + + logger := logs.GetLogger() + defer logger.Sync() + + if warn := cmd.Config.DecodeWarning; warn != nil { + logger.Warnf("ignoring unknown config option, this will become a fatal error in Icinga DB v1.2:\n\n%v", warn) + } + + logger.Info("Starting Icinga DB") + + db, err := cmd.Database(logs.GetChildLogger("database")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create database connection pool from config")) + } + defer db.Close() + { + logger.Infof("Connecting to database at '%s'", utils.JoinHostPort(cmd.Config.Database.Host, cmd.Config.Database.Port)) + err := db.Ping() + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't connect to database")) + } + } + + if err := db.CheckSchema(context.Background()); err != nil { + logger.Fatalf("%+v", err) + } + + rc, err := cmd.Redis(logs.GetChildLogger("redis")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create Redis client from config")) + } + { + logger.Infof("Connecting to Redis at '%s'", utils.JoinHostPort(cmd.Config.Redis.Host, cmd.Config.Redis.Port)) + _, err := rc.Ping(context.Background()).Result() + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't connect to Redis")) + } + } + + { + pos, err := checkRedisSchema(logger, rc, "0-0") + if err != nil { + logger.Fatalf("%+v", err) + } + + go monitorRedisSchema(logger, rc, pos) + } + + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + // Use dedicated connections for heartbeat and HA to ensure that heartbeats are always processed and + // the instance table is updated. Otherwise, the connections can be too busy due to the synchronization of + // configuration, status, history, etc., which can lead to handover / takeover loops because + // the heartbeat is not read while HA gets stuck when updating the instance table. + var heartbeat *icingaredis.Heartbeat + var ha *icingadb.HA + { + rc, err := cmd.Redis(logs.GetChildLogger("redis")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create Redis client from config")) + } + heartbeat = icingaredis.NewHeartbeat(ctx, rc, logs.GetChildLogger("heartbeat")) + + db, err := cmd.Database(logs.GetChildLogger("database")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create database connection pool from config")) + } + defer db.Close() + ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability")) + + telemetryLogger := logs.GetChildLogger("telemetry") + telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat) + telemetry.WriteStats(ctx, rc, telemetryLogger) + } + // Closing ha on exit ensures that this instance retracts its heartbeat + // from the database so that another instance can take over immediately. + defer func() { + // Give up after 3s, not 5m (default) not to hang for 5m if DB is down. + ctx, cancelCtx := context.WithTimeout(context.Background(), 3*time.Second) + + ha.Close(ctx) + cancelCtx() + }() + s := icingadb.NewSync(db, rc, logs.GetChildLogger("config-sync")) + hs := history.NewSync(db, rc, logs.GetChildLogger("history-sync")) + rt := icingadb.NewRuntimeUpdates(db, rc, logs.GetChildLogger("runtime-updates")) + ods := overdue.NewSync(db, rc, logs.GetChildLogger("overdue-sync")) + ret := history.NewRetention( + db, + cmd.Config.Retention.HistoryDays, + cmd.Config.Retention.SlaDays, + cmd.Config.Retention.Interval, + cmd.Config.Retention.Count, + cmd.Config.Retention.Options, + logs.GetChildLogger("retention"), + ) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) + + go func() { + logger.Info("Starting history sync") + + if err := hs.Sync(ctx); err != nil && !utils.IsContextCanceled(err) { + logger.Fatalf("%+v", err) + } + }() + + // Main loop + for { + hactx, cancelHactx := context.WithCancel(ctx) + for hactx.Err() == nil { + select { + case <-ha.Takeover(): + logger.Info("Taking over") + + go func() { + for hactx.Err() == nil { + synctx, cancelSynctx := context.WithCancel(ha.Environment().NewContext(hactx)) + g, synctx := errgroup.WithContext(synctx) + // WaitGroups for initial synchronization. + // Runtime updates must wait for initial synchronization to complete. + configInitSync := sync.WaitGroup{} + stateInitSync := &sync.WaitGroup{} + + // Clear the runtime update streams before starting anything else (rather than after the sync), + // otherwise updates may be lost. + runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.ClearStreams(synctx) + if err != nil { + logger.Fatalf("%+v", err) + } + + dump := icingadb.NewDumpSignals(rc, logs.GetChildLogger("dump-signals")) + g.Go(func() error { + logger.Debug("Staring config dump signal handling") + + return dump.Listen(synctx) + }) + + g.Go(func() error { + select { + case <-dump.InProgress(): + logger.Info("Icinga 2 started a new config dump, waiting for it to complete") + cancelSynctx() + + return nil + case <-synctx.Done(): + return synctx.Err() + } + }) + + g.Go(func() error { + logger.Info("Starting overdue sync") + + return ods.Sync(synctx) + }) + + syncStart := time.Now() + atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, syncStart.UnixMilli()) + + logger.Info("Starting config sync") + for _, factory := range v1.ConfigFactories { + factory := factory + + configInitSync.Add(1) + g.Go(func() error { + defer configInitSync.Done() + + return s.SyncAfterDump(synctx, common.NewSyncSubject(factory), dump) + }) + } + logger.Info("Starting initial state sync") + for _, factory := range v1.StateFactories { + factory := factory + + stateInitSync.Add(1) + g.Go(func() error { + defer stateInitSync.Done() + + return s.SyncAfterDump(synctx, common.NewSyncSubject(factory), dump) + }) + } + + configInitSync.Add(1) + g.Go(func() error { + defer configInitSync.Done() + + select { + case <-dump.Done("icinga:customvar"): + case <-synctx.Done(): + return synctx.Err() + } + + return s.SyncCustomvars(synctx) + }) + + g.Go(func() error { + configInitSync.Wait() + atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, 0) + + syncEnd := time.Now() + elapsed := syncEnd.Sub(syncStart) + logger := logs.GetChildLogger("config-sync") + + if synctx.Err() == nil { + telemetry.LastSuccessfulSync.Store(telemetry.SuccessfulSync{ + FinishMilli: syncEnd.UnixMilli(), + DurationMilli: elapsed.Milliseconds(), + }) + + logger.Infof("Finished config sync in %s", elapsed) + } else { + logger.Warnf("Aborted config sync after %s", elapsed) + } + + return nil + }) + + g.Go(func() error { + stateInitSync.Wait() + + elapsed := time.Since(syncStart) + logger := logs.GetChildLogger("config-sync") + if synctx.Err() == nil { + logger.Infof("Finished initial state sync in %s", elapsed) + } else { + logger.Warnf("Aborted initial state sync after %s", elapsed) + } + + return nil + }) + + g.Go(func() error { + configInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + + logger.Info("Starting config runtime updates sync") + + return rt.Sync(synctx, v1.ConfigFactories, runtimeConfigUpdateStreams, false) + }) + + g.Go(func() error { + stateInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + + logger.Info("Starting state runtime updates sync") + + return rt.Sync(synctx, v1.StateFactories, runtimeStateUpdateStreams, true) + }) + + g.Go(func() error { + // Wait for config and state sync to avoid putting additional pressure on the database. + configInitSync.Wait() + stateInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + + logger.Info("Starting history retention") + + return ret.Start(synctx) + }) + + if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) { + logger.Fatalf("%+v", err) + } + } + }() + case <-ha.Handover(): + logger.Warn("Handing over") + + cancelHactx() + case <-hactx.Done(): + // Nothing to do here, surrounding loop will terminate now. + case <-ha.Done(): + if err := ha.Err(); err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error")) + } else if ctx.Err() == nil { + // ha is created as a single instance once. It should only exit if the main context is cancelled, + // otherwise there is no way to get Icinga DB back into a working state. + logger.Fatalf("%+v", errors.New("HA exited without an error but main context isn't cancelled")) + } + cancelHactx() + + return ExitFailure + case <-ctx.Done(): + logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) + case s := <-sig: + logger.Infow("Exiting due to signal", zap.String("signal", s.String())) + cancelHactx() + + return ExitSuccess + } + } + + cancelHactx() + } +} + +// monitorRedisSchema monitors rc's icinga:schema version validity. +func monitorRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) { + for { + var err error + pos, err = checkRedisSchema(logger, rc, pos) + + if err != nil { + logger.Fatalf("%+v", err) + } + } +} + +// checkRedisSchema verifies rc's icinga:schema version. +func checkRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) (newPos string, err error) { + if pos == "0-0" { + defer time.AfterFunc(3*time.Second, func() { + logger.Info("Waiting for Icinga 2 to write into Redis, please make sure you have started Icinga 2 and the Icinga DB feature is enabled") + }).Stop() + } else { + logger.Debug("Checking Icinga 2 and Icinga DB compatibility") + } + + streams, err := rc.XReadUntilResult(context.Background(), &redis.XReadArgs{ + Streams: []string{"icinga:schema", pos}, + }) + if err != nil { + return "", errors.Wrap(err, "can't read Redis schema version") + } + + message := streams[0].Messages[0] + if version := message.Values["version"]; version != expectedRedisSchemaVersion { + // Since these error messages are trivial and mostly caused by users, we don't need + // to print a stack trace here. However, since errors.Errorf() does this automatically, + // we need to use fmt instead. + return "", fmt.Errorf( + "unexpected Redis schema version: %q (expected %q), please make sure you are running compatible"+ + " versions of Icinga 2 and Icinga DB", version, expectedRedisSchemaVersion, + ) + } + + logger.Debug("Redis schema version is correct") + return message.ID, nil +} |