diff options
Diffstat (limited to 'cmd/icingadb/main.go')
-rw-r--r-- | cmd/icingadb/main.go | 400 |
1 files changed, 400 insertions, 0 deletions
diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go new file mode 100644 index 0000000..29b7c44 --- /dev/null +++ b/cmd/icingadb/main.go @@ -0,0 +1,400 @@ +package main + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal/command" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/icingadb/history" + "github.com/icinga/icingadb/pkg/icingadb/overdue" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/utils" + "github.com/okzk/sdnotify" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "net" + "os" + "os/signal" + "sync" + "sync/atomic" + "syscall" + "time" +) + +const ( + ExitSuccess = 0 + ExitFailure = 1 + expectedRedisSchemaVersion = "5" +) + +func main() { + os.Exit(run()) +} + +func run() int { + cmd := command.New() + logs, err := logging.NewLogging( + utils.AppName(), + cmd.Config.Logging.Level, + cmd.Config.Logging.Output, + cmd.Config.Logging.Options, + cmd.Config.Logging.Interval, + ) + if err != nil { + utils.Fatal(errors.Wrap(err, "can't configure logging")) + } + // When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, which is the + // default setting for the Icinga DB service. So we notify that Icinga DB finished starting up. + _ = sdnotify.Ready() + + logger := logs.GetLogger() + defer logger.Sync() + + logger.Info("Starting Icinga DB") + + db, err := cmd.Database(logs.GetChildLogger("database")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create database connection pool from config")) + } + defer db.Close() + { + logger.Infof("Connecting to database at '%s'", net.JoinHostPort(cmd.Config.Database.Host, fmt.Sprint(cmd.Config.Database.Port))) + err := db.Ping() + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't connect to database")) + } + } + + if err := db.CheckSchema(context.Background()); err != nil { + logger.Fatalf("%+v", err) + } + + rc, err := cmd.Redis(logs.GetChildLogger("redis")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create Redis client from config")) + } + { + logger.Infof("Connecting to Redis at '%s'", net.JoinHostPort(cmd.Config.Redis.Host, fmt.Sprint(cmd.Config.Redis.Port))) + _, err := rc.Ping(context.Background()).Result() + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't connect to Redis")) + } + } + + { + pos, err := checkRedisSchema(logger, rc, "0-0") + if err != nil { + logger.Fatalf("%+v", err) + } + + go monitorRedisSchema(logger, rc, pos) + } + + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + // Use dedicated connections for heartbeat and HA to ensure that heartbeats are always processed and + // the instance table is updated. Otherwise, the connections can be too busy due to the synchronization of + // configuration, status, history, etc., which can lead to handover / takeover loops because + // the heartbeat is not read while HA gets stuck when updating the instance table. + var heartbeat *icingaredis.Heartbeat + var ha *icingadb.HA + { + rc, err := cmd.Redis(logs.GetChildLogger("redis")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create Redis client from config")) + } + heartbeat = icingaredis.NewHeartbeat(ctx, rc, logs.GetChildLogger("heartbeat")) + + db, err := cmd.Database(logs.GetChildLogger("database")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create database connection pool from config")) + } + defer db.Close() + ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability")) + + telemetryLogger := logs.GetChildLogger("telemetry") + telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat) + telemetry.WriteStats(ctx, rc, telemetryLogger) + } + // Closing ha on exit ensures that this instance retracts its heartbeat + // from the database so that another instance can take over immediately. + defer func() { + // Give up after 3s, not 5m (default) not to hang for 5m if DB is down. + ctx, cancelCtx := context.WithTimeout(context.Background(), 3*time.Second) + + ha.Close(ctx) + cancelCtx() + }() + s := icingadb.NewSync(db, rc, logs.GetChildLogger("config-sync")) + hs := history.NewSync(db, rc, logs.GetChildLogger("history-sync")) + rt := icingadb.NewRuntimeUpdates(db, rc, logs.GetChildLogger("runtime-updates")) + ods := overdue.NewSync(db, rc, logs.GetChildLogger("overdue-sync")) + ret := history.NewRetention( + db, + cmd.Config.Retention.HistoryDays, + cmd.Config.Retention.SlaDays, + cmd.Config.Retention.Interval, + cmd.Config.Retention.Count, + cmd.Config.Retention.Options, + logs.GetChildLogger("retention"), + ) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) + + go func() { + logger.Info("Starting history sync") + + if err := hs.Sync(ctx); err != nil && !utils.IsContextCanceled(err) { + logger.Fatalf("%+v", err) + } + }() + + // Main loop + for { + hactx, cancelHactx := context.WithCancel(ctx) + for hactx.Err() == nil { + select { + case <-ha.Takeover(): + logger.Info("Taking over") + + go func() { + for hactx.Err() == nil { + synctx, cancelSynctx := context.WithCancel(ha.Environment().NewContext(hactx)) + g, synctx := errgroup.WithContext(synctx) + // WaitGroups for initial synchronization. + // Runtime updates must wait for initial synchronization to complete. + configInitSync := sync.WaitGroup{} + stateInitSync := &sync.WaitGroup{} + + // Clear the runtime update streams before starting anything else (rather than after the sync), + // otherwise updates may be lost. + runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.ClearStreams(synctx) + if err != nil { + logger.Fatalf("%+v", err) + } + + dump := icingadb.NewDumpSignals(rc, logs.GetChildLogger("dump-signals")) + g.Go(func() error { + logger.Debug("Staring config dump signal handling") + + return dump.Listen(synctx) + }) + + g.Go(func() error { + select { + case <-dump.InProgress(): + logger.Info("Icinga 2 started a new config dump, waiting for it to complete") + cancelSynctx() + + return nil + case <-synctx.Done(): + return synctx.Err() + } + }) + + g.Go(func() error { + logger.Info("Starting overdue sync") + + return ods.Sync(synctx) + }) + + syncStart := time.Now() + atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, syncStart.UnixMilli()) + + logger.Info("Starting config sync") + for _, factory := range v1.ConfigFactories { + factory := factory + + configInitSync.Add(1) + g.Go(func() error { + defer configInitSync.Done() + + return s.SyncAfterDump(synctx, common.NewSyncSubject(factory), dump) + }) + } + logger.Info("Starting initial state sync") + for _, factory := range v1.StateFactories { + factory := factory + + stateInitSync.Add(1) + g.Go(func() error { + defer stateInitSync.Done() + + return s.SyncAfterDump(synctx, common.NewSyncSubject(factory), dump) + }) + } + + configInitSync.Add(1) + g.Go(func() error { + defer configInitSync.Done() + + select { + case <-dump.Done("icinga:customvar"): + case <-synctx.Done(): + return synctx.Err() + } + + return s.SyncCustomvars(synctx) + }) + + g.Go(func() error { + configInitSync.Wait() + atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, 0) + + syncEnd := time.Now() + elapsed := syncEnd.Sub(syncStart) + logger := logs.GetChildLogger("config-sync") + + if synctx.Err() == nil { + telemetry.LastSuccessfulSync.Store(telemetry.SuccessfulSync{ + FinishMilli: syncEnd.UnixMilli(), + DurationMilli: elapsed.Milliseconds(), + }) + + logger.Infof("Finished config sync in %s", elapsed) + } else { + logger.Warnf("Aborted config sync after %s", elapsed) + } + + return nil + }) + + g.Go(func() error { + stateInitSync.Wait() + + elapsed := time.Since(syncStart) + logger := logs.GetChildLogger("config-sync") + if synctx.Err() == nil { + logger.Infof("Finished initial state sync in %s", elapsed) + } else { + logger.Warnf("Aborted initial state sync after %s", elapsed) + } + + return nil + }) + + g.Go(func() error { + configInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + + logger.Info("Starting config runtime updates sync") + + return rt.Sync(synctx, v1.ConfigFactories, runtimeConfigUpdateStreams, false) + }) + + g.Go(func() error { + stateInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + + logger.Info("Starting state runtime updates sync") + + return rt.Sync(synctx, v1.StateFactories, runtimeStateUpdateStreams, true) + }) + + g.Go(func() error { + // Wait for config and state sync to avoid putting additional pressure on the database. + configInitSync.Wait() + stateInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + + logger.Info("Starting history retention") + + return ret.Start(synctx) + }) + + if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) { + logger.Fatalf("%+v", err) + } + } + }() + case <-ha.Handover(): + logger.Warn("Handing over") + + cancelHactx() + case <-hactx.Done(): + // Nothing to do here, surrounding loop will terminate now. + case <-ha.Done(): + if err := ha.Err(); err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error")) + } else if ctx.Err() == nil { + // ha is created as a single instance once. It should only exit if the main context is cancelled, + // otherwise there is no way to get Icinga DB back into a working state. + logger.Fatalf("%+v", errors.New("HA exited without an error but main context isn't cancelled")) + } + cancelHactx() + + return ExitFailure + case <-ctx.Done(): + logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) + case s := <-sig: + logger.Infow("Exiting due to signal", zap.String("signal", s.String())) + cancelHactx() + + return ExitSuccess + } + } + + cancelHactx() + } +} + +// monitorRedisSchema monitors rc's icinga:schema version validity. +func monitorRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) { + for { + var err error + pos, err = checkRedisSchema(logger, rc, pos) + + if err != nil { + logger.Fatalf("%+v", err) + } + } +} + +// checkRedisSchema verifies rc's icinga:schema version. +func checkRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) (newPos string, err error) { + if pos == "0-0" { + defer time.AfterFunc(3*time.Second, func() { + logger.Info("Waiting for Icinga 2 to write into Redis, please make sure you have started Icinga 2 and the Icinga DB feature is enabled") + }).Stop() + } else { + logger.Debug("Checking Icinga 2 and Icinga DB compatibility") + } + + streams, err := rc.XReadUntilResult(context.Background(), &redis.XReadArgs{ + Streams: []string{"icinga:schema", pos}, + }) + if err != nil { + return "", errors.Wrap(err, "can't read Redis schema version") + } + + message := streams[0].Messages[0] + if version := message.Values["version"]; version != expectedRedisSchemaVersion { + // Since these error messages are trivial and mostly caused by users, we don't need + // to print a stack trace here. However, since errors.Errorf() does this automatically, + // we need to use fmt instead. + return "", fmt.Errorf( + "unexpected Redis schema version: %q (expected %q), please make sure you are running compatible"+ + " versions of Icinga 2 and Icinga DB", version, expectedRedisSchemaVersion, + ) + } + + logger.Debug("Redis schema version is correct") + return message.ID, nil +} |