package main import ( "context" "database/sql" _ "embed" "encoding/hex" "fmt" "github.com/creasty/defaults" "github.com/goccy/go-yaml" "github.com/icinga/icingadb/pkg/config" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" icingadbTypes "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/jessevdk/go-flags" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" _ "github.com/mattn/go-sqlite3" "github.com/pkg/errors" "github.com/vbauerster/mpb/v6" "go.uber.org/zap" "golang.org/x/sync/errgroup" "math" "os" "path" "path/filepath" "regexp" "strings" "time" ) // Flags defines the CLI flags. type Flags struct { // Config is the path to the config file. Config string `short:"c" long:"config" description:"path to config file" required:"true"` // Cache is a (not necessarily yet existing) directory for caching. Cache string `short:"t" long:"cache" description:"path for caching" required:"true"` } // Config defines the YAML config structure. type Config struct { IDO struct { config.Database `yaml:"-,inline"` From int32 `yaml:"from"` To int32 `yaml:"to" default:"2147483647"` } `yaml:"ido"` IcingaDB config.Database `yaml:"icingadb"` // Icinga2 specifies information the IDO doesn't provide. Icinga2 struct { // Env specifies the environment ID, hex. Env string `yaml:"env"` } `yaml:"icinga2"` } // main validates the CLI, parses the config and migrates history from IDO to Icinga DB (see comments below). // Most of the called functions exit the whole program by themselves on non-recoverable errors. func main() { f := &Flags{} if _, err := flags.NewParser(f, flags.Default).Parse(); err != nil { os.Exit(2) } c, ex := parseConfig(f) if c == nil { os.Exit(ex) } envId, err := hex.DecodeString(c.Icinga2.Env) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "bad env ID: %s\n", err.Error()) os.Exit(2) } defer func() { _ = log.Sync() }() log.Info("Starting IDO to Icinga DB history migration") ido, idb := connectAll(c) if err := idb.CheckSchema(context.Background()); err != nil { log.Fatalf("%+v", err) } // Start repeatable-read-isolated transactions (consistent SELECTs) // not to have to care for IDO data changes during migration. startIdoTx(ido) // Prepare the directory structure the following fillCache() will need later. mkCache(f, c, idb.Mapper) log.Info("Computing progress") // Convert Config#IDO.From and .To to IDs to restrict data by PK. computeIdRange(c) // computeProgress figures out which data has already been migrated // not to start from the beginning every time in the following migrate(). computeProgress(c, idb, envId) // On rationale read buildEventTimeCache() and buildPreviousHardStateCache() docs. log.Info("Filling cache") fillCache() log.Info("Actually migrating") migrate(c, idb, envId) log.Info("Cleaning up cache") cleanupCache(f) } // parseConfig validates the f.Config file and returns the config and -1 or - on failure - nil and an exit code. func parseConfig(f *Flags) (_ *Config, exit int) { cf, err := os.Open(f.Config) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "can't open config file: %s\n", err.Error()) return nil, 2 } defer func() { _ = cf.Close() }() c := &Config{} if err := defaults.Set(c); err != nil { _, _ = fmt.Fprintf(os.Stderr, "can't set config defaults: %s\n", err.Error()) return nil, 2 } if err := yaml.NewDecoder(cf).Decode(c); err != nil { _, _ = fmt.Fprintf(os.Stderr, "can't parse config file: %s\n", err.Error()) return nil, 2 } return c, -1 } var nonWords = regexp.MustCompile(`\W+`) // mkCache ensures /.sqlite3 files are present and contain their schema // and initializes types[*].cache. (On non-recoverable errors the whole program exits.) func mkCache(f *Flags, c *Config, mapper *reflectx.Mapper) { log.Info("Preparing cache") if err := os.MkdirAll(f.Cache, 0700); err != nil { log.With("dir", f.Cache).Fatalf("%+v", errors.Wrap(err, "can't create directory")) } types.forEach(func(ht *historyType) { if ht.cacheSchema == "" { return } file := path.Join(f.Cache, fmt.Sprintf( "%s_%d-%d.sqlite3", nonWords.ReplaceAllLiteralString(ht.name, "_"), c.IDO.From, c.IDO.To, )) var err error ht.cache, err = sqlx.Open("sqlite3", "file:"+file) if err != nil { log.With("file", file).Fatalf("%+v", errors.Wrap(err, "can't open SQLite database")) } ht.cacheFile = file ht.cache.Mapper = mapper if _, err := ht.cache.Exec(ht.cacheSchema); err != nil { log.With("file", file, "ddl", ht.cacheSchema). Fatalf("%+v", errors.Wrap(err, "can't import schema into SQLite database")) } }) } // connectAll connects to ido and idb (Icinga DB) as c specifies. (On non-recoverable errors the whole program exits.) func connectAll(c *Config) (ido, idb *icingadb.DB) { log.Info("Connecting to databases") eg, _ := errgroup.WithContext(context.Background()) eg.Go(func() error { ido = connect("IDO", &c.IDO.Database) return nil }) eg.Go(func() error { idb = connect("Icinga DB", &c.IcingaDB) return nil }) _ = eg.Wait() return } // connect connects to which DB as cfg specifies. (On non-recoverable errors the whole program exits.) func connect(which string, cfg *config.Database) *icingadb.DB { db, err := cfg.Open(logging.NewLogger(zap.NewNop().Sugar(), 20*time.Second)) if err != nil { log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database")) } if err := db.Ping(); err != nil { log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database")) } return db } // startIdoTx initializes types[*].snapshot with new repeatable-read-isolated ido transactions. // (On non-recoverable errors the whole program exits.) func startIdoTx(ido *icingadb.DB) { types.forEach(func(ht *historyType) { tx, err := ido.BeginTxx(context.Background(), &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) if err != nil { log.Fatalf("%+v", errors.Wrap(err, "can't begin snapshot transaction")) } ht.snapshot = tx }) } // computeIdRange initializes types[*].fromId and types[*].toId. // (On non-recoverable errors the whole program exits.) func computeIdRange(c *Config) { types.forEach(func(ht *historyType) { getBorderId := func(id *uint64, timeColumns []string, compOperator string, borderTime int32, sortOrder string) { deZeroFied := make([]string, 0, len(timeColumns)) for _, column := range timeColumns { deZeroFied = append(deZeroFied, fmt.Sprintf( "CASE WHEN %[1]s < '1970-01-03 00:00:00' THEN NULL ELSE %[1]s END", column, )) } var timeExpr string if len(deZeroFied) > 1 { timeExpr = "COALESCE(" + strings.Join(deZeroFied, ",") + ")" } else { timeExpr = deZeroFied[0] } query := ht.snapshot.Rebind( "SELECT " + ht.idoIdColumn + " FROM " + ht.idoTable + " WHERE " + timeExpr + " " + compOperator + " FROM_UNIXTIME(?) ORDER BY " + ht.idoIdColumn + " " + sortOrder + " LIMIT 1", ) switch err := ht.snapshot.Get(id, query, borderTime); err { case nil, sql.ErrNoRows: default: log.With("backend", "IDO", "query", query, "args", []any{borderTime}). Fatalf("%+v", errors.Wrap(err, "can't perform query")) } } ht.fromId = math.MaxInt64 getBorderId(&ht.fromId, ht.idoEndColumns, ">=", c.IDO.From, "ASC") getBorderId(&ht.toId, ht.idoStartColumns, "<=", c.IDO.To, "DESC") }) } //go:embed embed/ido_migration_progress_schema.sql var idoMigrationProgressSchema string // computeProgress initializes types[*].lastId, types[*].total and types[*].done. // (On non-recoverable errors the whole program exits.) func computeProgress(c *Config, idb *icingadb.DB, envId []byte) { if _, err := idb.Exec(idoMigrationProgressSchema); err != nil { log.Fatalf("%+v", errors.Wrap(err, "can't create table ido_migration_progress")) } envIdHex := hex.EncodeToString(envId) types.forEach(func(ht *historyType) { var query = idb.Rebind( "SELECT last_ido_id FROM ido_migration_progress" + " WHERE environment_id=? AND history_type=? AND from_ts=? AND to_ts=?", ) args := []any{envIdHex, ht.name, c.IDO.From, c.IDO.To} if err := idb.Get(&ht.lastId, query, args...); err != nil && err != sql.ErrNoRows { log.With("backend", "Icinga DB", "query", query, "args", args). Fatalf("%+v", errors.Wrap(err, "can't perform query")) } }) types.forEach(func(ht *historyType) { if ht.cacheFiller != nil { err := ht.snapshot.Get( &ht.cacheTotal, ht.snapshot.Rebind( // For actual migration icinga_objects will be joined anyway, // so it makes no sense to take vanished objects into account. "SELECT COUNT(*) FROM "+ht.idoTable+ " xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ht.idoIdColumn+" <= ?", ), ht.toId, ) if err != nil { log.Fatalf("%+v", errors.Wrap(err, "can't count query")) } } }) types.forEach(func(ht *historyType) { var rows []struct { Migrated uint8 Cnt int64 } err := ht.snapshot.Select( &rows, ht.snapshot.Rebind( // For actual migration icinga_objects will be joined anyway, // so it makes no sense to take vanished objects into account. "SELECT CASE WHEN xh."+ht.idoIdColumn+"<=? THEN 1 ELSE 0 END migrated, COUNT(*) cnt FROM "+ ht.idoTable+" xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ ht.idoIdColumn+" BETWEEN ? AND ? GROUP BY migrated", ), ht.lastId, ht.fromId, ht.toId, ) if err != nil { log.Fatalf("%+v", errors.Wrap(err, "can't count query")) } for _, row := range rows { ht.total += row.Cnt if row.Migrated == 1 { ht.done = row.Cnt } } log.Infow("Counted migrated IDO events", "type", ht.name, "migrated", ht.done, "total", ht.total) }) } // fillCache fills /.sqlite3 (actually types[*].cacheFiller does). func fillCache() { progress := mpb.New() for _, ht := range types { if ht.cacheFiller != nil { ht.setupBar(progress, ht.cacheTotal) } } types.forEach(func(ht *historyType) { if ht.cacheFiller != nil { ht.cacheFiller(ht) } }) progress.Wait() } // migrate does the actual migration. func migrate(c *Config, idb *icingadb.DB, envId []byte) { progress := mpb.New() for _, ht := range types { ht.setupBar(progress, ht.total) } types.forEach(func(ht *historyType) { ht.migrate(c, idb, envId, ht) }) progress.Wait() } // migrate does the actual migration for one history type. func migrateOneType[IdoRow any]( c *Config, idb *icingadb.DB, envId []byte, ht *historyType, convertRows func(env string, envId icingadbTypes.Binary, selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []IdoRow) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any), ) { var lastQuery string var lastStmt *sqlx.Stmt defer func() { if lastStmt != nil { _ = lastStmt.Close() } }() selectCache := func(dest interface{}, query string, args ...interface{}) { // Prepare new one, if old one doesn't fit anymore. if query != lastQuery { if lastStmt != nil { _ = lastStmt.Close() } var err error lastStmt, err = ht.cache.Preparex(query) if err != nil { log.With("backend", "cache", "query", query). Fatalf("%+v", errors.Wrap(err, "can't prepare query")) } lastQuery = query } if err := lastStmt.Select(dest, args...); err != nil { log.With("backend", "cache", "query", query, "args", args). Fatalf("%+v", errors.Wrap(err, "can't perform query")) } } var args map[string]interface{} // For the case that the cache was older that the IDO, // but ht.cacheFiller couldn't update it, limit (WHERE) our source data set. if ht.cacheLimitQuery != "" { var limit sql.NullInt64 cacheGet(ht.cache, &limit, ht.cacheLimitQuery) args = map[string]interface{}{"cache_limit": limit.Int64} } upsertProgress, _ := idb.BuildUpsertStmt(&IdoMigrationProgress{}) envIdHex := hex.EncodeToString(envId) ht.bar.SetCurrent(ht.done) // Stream IDO rows, ... sliceIdoHistory( ht, ht.migrationQuery, args, ht.lastId, func(idoRows []IdoRow) (checkpoint interface{}) { // ... convert them, ... inserts, upserts, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows) // ... and insert them: for _, op := range []struct { kind string data [][]contracts.Entity streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error }{{"INSERT IGNORE", inserts, idb.CreateIgnoreStreamed}, {"UPSERT", upserts, idb.UpsertStreamed}} { for _, table := range op.data { if len(table) < 1 { continue } ch := make(chan contracts.Entity, len(table)) for _, row := range table { ch <- row } close(ch) if err := op.streamer(context.Background(), ch); err != nil { log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])). Fatalf("%+v", errors.Wrap(err, "can't perform DML")) } } } if lastIdoId != nil { args := map[string]interface{}{"history_type": ht.name, "last_ido_id": lastIdoId} _, err := idb.NamedExec(upsertProgress, &IdoMigrationProgress{ IdoMigrationProgressUpserter{lastIdoId}, envIdHex, ht.name, c.IDO.From, c.IDO.To, }) if err != nil { log.With("backend", "Icinga DB", "dml", upsertProgress, "args", args). Fatalf("%+v", errors.Wrap(err, "can't perform DML")) } } ht.bar.IncrBy(len(idoRows)) return lastIdoId }, ) ht.bar.SetTotal(ht.bar.Current(), true) } // cleanupCache removes /.sqlite3 files. func cleanupCache(f *Flags) { types.forEach(func(ht *historyType) { if ht.cacheFile != "" { if err := ht.cache.Close(); err != nil { log.With("file", ht.cacheFile).Warnf("%+v", errors.Wrap(err, "can't close SQLite database")) } } }) if matches, err := filepath.Glob(path.Join(f.Cache, "*.sqlite3")); err == nil { for _, match := range matches { if err := os.Remove(match); err != nil { log.With("file", match).Warnf("%+v", errors.Wrap(err, "can't remove SQLite database")) } } } else { log.With("dir", f.Cache).Warnf("%+v", errors.Wrap(err, "can't list SQLite databases")) } }