diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:41:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:41:39 +0000 |
commit | 6696b6fca647cb1fe1ce74e8127090aa61482cab (patch) | |
tree | 75c2028bfcc4d098edf4679aa5dcefcdd45da00f /pkg | |
parent | Adding debian version 1.1.1-2. (diff) | |
download | icingadb-6696b6fca647cb1fe1ce74e8127090aa61482cab.tar.xz icingadb-6696b6fca647cb1fe1ce74e8127090aa61482cab.zip |
Merging upstream version 1.2.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'pkg')
29 files changed, 572 insertions, 400 deletions
diff --git a/pkg/backoff/backoff.go b/pkg/backoff/backoff.go index 6ce7bee..e79a1ee 100644 --- a/pkg/backoff/backoff.go +++ b/pkg/backoff/backoff.go @@ -14,10 +14,10 @@ type Backoff func(uint64) time.Duration // It panics if min >= max. func NewExponentialWithJitter(min, max time.Duration) Backoff { if min <= 0 { - min = time.Millisecond * 100 + min = 100 * time.Millisecond } if max <= 0 { - max = time.Second * 10 + max = 10 * time.Second } if min >= max { panic("max must be larger than min") diff --git a/pkg/config/config.go b/pkg/config/config.go index a683014..744f4c3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,15 +1,12 @@ package config import ( - "bytes" "crypto/tls" "crypto/x509" - "fmt" "github.com/creasty/defaults" "github.com/goccy/go-yaml" "github.com/jessevdk/go-flags" "github.com/pkg/errors" - "io/ioutil" "os" ) @@ -19,8 +16,6 @@ type Config struct { Redis Redis `yaml:"redis"` Logging Logging `yaml:"logging"` Retention Retention `yaml:"retention"` - - DecodeWarning error `yaml:"-"` } // Validate checks constraints in the supplied configuration and returns an error if they are violated. @@ -51,13 +46,14 @@ type Flags struct { // FromYAMLFile returns a new Config value created from the given YAML config file. func FromYAMLFile(name string) (*Config, error) { - f, err := os.ReadFile(name) + f, err := os.Open(name) if err != nil { - return nil, errors.Wrap(err, "can't read YAML file "+name) + return nil, errors.Wrap(err, "can't open YAML file "+name) } + defer f.Close() c := &Config{} - d := yaml.NewDecoder(bytes.NewReader(f)) + d := yaml.NewDecoder(f, yaml.DisallowUnknownField()) if err := defaults.Set(c); err != nil { return nil, errors.Wrap(err, "can't set config defaults") @@ -67,16 +63,8 @@ func FromYAMLFile(name string) (*Config, error) { return nil, errors.Wrap(err, "can't parse YAML file "+name) } - // Decode again with yaml.DisallowUnknownField() (like v1.2 will do) and issue a warning if it returns an error. - c.DecodeWarning = yaml.NewDecoder(bytes.NewReader(f), yaml.DisallowUnknownField()).Decode(&Config{}) - if err := c.Validate(); err != nil { - const msg = "invalid configuration" - if warn := c.DecodeWarning; warn != nil { - return nil, fmt.Errorf("%s: %w\n\nwarning: ignored unknown config option:\n\n%v", msg, err, warn) - } else { - return nil, errors.Wrap(err, msg) - } + return nil, errors.Wrap(err, "invalid configuration") } return c, nil @@ -129,7 +117,7 @@ func (t *TLS) MakeConfig(serverName string) (*tls.Config, error) { if t.Insecure { tlsConfig.InsecureSkipVerify = true } else if t.Ca != "" { - raw, err := ioutil.ReadFile(t.Ca) + raw, err := os.ReadFile(t.Ca) if err != nil { return nil, errors.Wrap(err, "can't read CA file") } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 2418094..94e3773 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -20,34 +20,33 @@ redis: host: 2001:db8::1 ` - miniOutput := &Config{} - _ = defaults.Set(miniOutput) - - miniOutput.Database.Host = "192.0.2.1" - miniOutput.Database.Database = "icingadb" - miniOutput.Database.User = "icingadb" - miniOutput.Database.Password = "icingadb" - - miniOutput.Redis.Host = "2001:db8::1" - miniOutput.Logging.Output = logging.CONSOLE - subtests := []struct { name string input string output *Config - warn bool }{ { - name: "mini", - input: miniConf, - output: miniOutput, - warn: false, + name: "mini", + input: miniConf, + output: func() *Config { + c := &Config{} + _ = defaults.Set(c) + + c.Database.Host = "192.0.2.1" + c.Database.Database = "icingadb" + c.Database.User = "icingadb" + c.Database.Password = "icingadb" + + c.Redis.Host = "2001:db8::1" + c.Logging.Output = logging.CONSOLE + + return c + }(), }, { name: "mini-with-unknown", input: miniConf + "\nunknown: 42", - output: miniOutput, - warn: true, + output: nil, }, } @@ -59,19 +58,12 @@ redis: require.NoError(t, os.WriteFile(tempFile.Name(), []byte(st.input), 0o600)) - actual, err := FromYAMLFile(tempFile.Name()) - require.NoError(t, err) - - if st.warn { - require.Error(t, actual.DecodeWarning, "reading config should produce a warning") - - // Reset the warning so that the following require.Equal() doesn't try to compare it. - actual.DecodeWarning = nil + if actual, err := FromYAMLFile(tempFile.Name()); st.output == nil { + require.Error(t, err) } else { - require.NoError(t, actual.DecodeWarning, "reading config should not produce a warning") + require.NoError(t, err) + require.Equal(t, st.output, actual) } - - require.Equal(t, st.output, actual) }) } } diff --git a/pkg/config/database.go b/pkg/config/database.go index b42ff8e..0895d26 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -1,25 +1,25 @@ package config import ( + "context" + "database/sql" + "database/sql/driver" "fmt" "github.com/go-sql-driver/mysql" - "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" + "github.com/lib/pq" "github.com/pkg/errors" "net" "net/url" "strconv" "strings" - "sync" "time" ) -var registerDriverOnce sync.Once - // Database defines database client configuration. type Database struct { Type string `yaml:"type" default:"mysql"` @@ -35,17 +35,14 @@ type Database struct { // Open prepares the DSN string and driver configuration, // calls sqlx.Open, but returns *icingadb.DB. func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { - registerDriverOnce.Do(func() { - driver.Register(logger) - }) - - var dsn string + var db *sqlx.DB switch d.Type { case "mysql": config := mysql.NewConfig() config.User = d.User config.Passwd = d.Password + config.Logger = icingadb.MysqlFuncLogger(logger.Debug) if d.isUnixAddr() { config.Net = "unix" @@ -61,7 +58,7 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { config.DBName = d.Database config.Timeout = time.Minute - config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"} + config.Params = map[string]string{"sql_mode": "'TRADITIONAL,ANSI_QUOTES'"} tlsConfig, err := d.TlsOptions.MakeConfig(d.Host) if err != nil { @@ -75,7 +72,17 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { } } - dsn = config.FormatDSN() + c, err := mysql.NewConnector(config) + if err != nil { + return nil, errors.Wrap(err, "can't open mysql database") + } + + wsrepSyncWait := int64(d.Options.WsrepSyncWait) + setWsrepSyncWait := func(ctx context.Context, conn driver.Conn) error { + return setGaleraOpts(ctx, conn, wsrepSyncWait) + } + + db = sqlx.NewDb(sql.OpenDB(icingadb.NewConnector(c, logger, setWsrepSyncWait)), icingadb.MySQL) case "pgsql": uri := &url.URL{ Scheme: "postgres", @@ -123,16 +130,17 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { } uri.RawQuery = query.Encode() - dsn = uri.String() + + connector, err := pq.NewConnector(uri.String()) + if err != nil { + return nil, errors.Wrap(err, "can't open pgsql database") + } + + db = sqlx.NewDb(sql.OpenDB(icingadb.NewConnector(connector, logger, nil)), icingadb.PostgreSQL) default: return nil, unknownDbType(d.Type) } - db, err := sqlx.Open("icingadb-"+d.Type, dsn) - if err != nil { - return nil, errors.Wrap(err, "can't open database") - } - db.SetMaxIdleConns(d.Options.MaxConnections / 3) db.SetMaxOpenConns(d.Options.MaxConnections) @@ -173,3 +181,36 @@ func (d *Database) isUnixAddr() bool { func unknownDbType(t string) error { return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t) } + +// setGaleraOpts sets the "wsrep_sync_wait" variable for each session ensures that causality checks are performed +// before execution and that each statement is executed on a fully synchronized node. Doing so prevents foreign key +// violation when inserting into dependent tables on different MariaDB/MySQL nodes. When using MySQL single nodes, +// the "SET SESSION" command will fail with "Unknown system variable (1193)" and will therefore be silently dropped. +// +// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait +func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error { + const galeraOpts = "SET SESSION wsrep_sync_wait=?" + + stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts) + if err != nil { + if errors.Is(err, &mysql.MySQLError{Number: 1193}) { // Unknown system variable + return nil + } + + return errors.Wrap(err, "cannot prepare "+galeraOpts) + } + // This is just for an unexpected exit and any returned error can safely be ignored and in case + // of the normal function exit, the stmt is closed manually, and its error is handled gracefully. + defer func() { _ = stmt.Close() }() + + _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}}) + if err != nil { + return errors.Wrap(err, "cannot execute "+galeraOpts) + } + + if err = stmt.Close(); err != nil { + return errors.Wrap(err, "cannot close prepared statement "+galeraOpts) + } + + return nil +} diff --git a/pkg/config/redis.go b/pkg/config/redis.go index 38571e3..ad8b31a 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -4,13 +4,13 @@ import ( "context" "crypto/tls" "fmt" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "net" "strings" @@ -85,16 +85,16 @@ func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc retry.Retryable, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), retry.Settings{ - Timeout: 5 * time.Minute, - OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + Timeout: retry.DefaultTimeout, + OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) { if lastErr == nil || err.Error() != lastErr.Error() { logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err)) } }, OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { - if attempt > 0 { + if attempt > 1 { logger.Infow("Reconnected to Redis", - zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) + zap.Duration("after", elapsed), zap.Uint64("attempts", attempt)) } }, }, diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go deleted file mode 100644 index f529db4..0000000 --- a/pkg/driver/driver.go +++ /dev/null @@ -1,114 +0,0 @@ -package driver - -import ( - "context" - "database/sql" - "database/sql/driver" - "github.com/go-sql-driver/mysql" - "github.com/icinga/icingadb/pkg/backoff" - "github.com/icinga/icingadb/pkg/icingaredis/telemetry" - "github.com/icinga/icingadb/pkg/logging" - "github.com/icinga/icingadb/pkg/retry" - "github.com/jmoiron/sqlx" - "github.com/pkg/errors" - "go.uber.org/zap" - "time" -) - -const MySQL = "icingadb-mysql" -const PostgreSQL = "icingadb-pgsql" - -var timeout = time.Minute * 5 - -// RetryConnector wraps driver.Connector with retry logic. -type RetryConnector struct { - driver.Connector - driver Driver -} - -// Connect implements part of the driver.Connector interface. -func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { - var conn driver.Conn - err := errors.Wrap(retry.WithBackoff( - ctx, - func(ctx context.Context) (err error) { - conn, err = c.Connector.Connect(ctx) - return - }, - shouldRetry, - backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1), - retry.Settings{ - Timeout: timeout, - OnError: func(_ time.Duration, _ uint64, err, lastErr error) { - telemetry.UpdateCurrentDbConnErr(err) - - if lastErr == nil || err.Error() != lastErr.Error() { - c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err)) - } - }, - OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { - telemetry.UpdateCurrentDbConnErr(nil) - - if attempt > 0 { - c.driver.Logger.Infow("Reconnected to database", - zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) - } - }, - }, - ), "can't connect to database") - return conn, err -} - -// Driver implements part of the driver.Connector interface. -func (c RetryConnector) Driver() driver.Driver { - return c.driver -} - -// Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector. -type Driver struct { - ctxDriver - Logger *logging.Logger -} - -// OpenConnector implements the DriverContext interface. -func (d Driver) OpenConnector(name string) (driver.Connector, error) { - c, err := d.ctxDriver.OpenConnector(name) - if err != nil { - return nil, err - } - - return &RetryConnector{ - driver: d, - Connector: c, - }, nil -} - -// Register makes our database Driver available under the name "icingadb-*sql". -func Register(logger *logging.Logger) { - sql.Register(MySQL, &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger}) - sql.Register(PostgreSQL, &Driver{ctxDriver: PgSQLDriver{}, Logger: logger}) - _ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) })) - sqlx.BindDriver(PostgreSQL, sqlx.DOLLAR) -} - -// ctxDriver helps ensure that we only support drivers that implement driver.Driver and driver.DriverContext. -type ctxDriver interface { - driver.Driver - driver.DriverContext -} - -// mysqlLogger is an adapter that allows ordinary functions to be used as a logger for mysql.SetLogger. -type mysqlLogger func(v ...interface{}) - -// Print implements the mysql.Logger interface. -func (log mysqlLogger) Print(v ...interface{}) { - log(v) -} - -func shouldRetry(err error) bool { - if errors.Is(err, driver.ErrBadConn) { - return true - } - - return retry.Retryable(err) -} diff --git a/pkg/driver/pgsql.go b/pkg/driver/pgsql.go deleted file mode 100644 index 3c88fe0..0000000 --- a/pkg/driver/pgsql.go +++ /dev/null @@ -1,22 +0,0 @@ -package driver - -import ( - "database/sql/driver" - "github.com/lib/pq" -) - -// PgSQLDriver extends pq.Driver with driver.DriverContext compliance. -type PgSQLDriver struct { - pq.Driver -} - -// Assert interface compliance. -var ( - _ driver.Driver = PgSQLDriver{} - _ driver.DriverContext = PgSQLDriver{} -) - -// OpenConnector implements the driver.DriverContext interface. -func (PgSQLDriver) OpenConnector(name string) (driver.Connector, error) { - return pq.NewConnector(name) -} diff --git a/pkg/flatten/flatten.go b/pkg/flatten/flatten.go index 94a6e7e..698eff1 100644 --- a/pkg/flatten/flatten.go +++ b/pkg/flatten/flatten.go @@ -1,7 +1,6 @@ package flatten import ( - "database/sql" "fmt" "github.com/icinga/icingadb/pkg/types" "strconv" @@ -32,12 +31,12 @@ func Flatten(value interface{}, prefix string) map[string]types.String { for i, v := range value { flatten(key+"["+strconv.Itoa(i)+"]", v) } + case nil: + flattened[key] = types.MakeString("null") + case float64: + flattened[key] = types.MakeString(strconv.FormatFloat(value, 'f', -1, 64)) default: - val := "null" - if value != nil { - val = fmt.Sprintf("%v", value) - } - flattened[key] = types.String{NullString: sql.NullString{String: val, Valid: true}} + flattened[key] = types.MakeString(fmt.Sprintf("%v", value)) } } diff --git a/pkg/flatten/flatten_test.go b/pkg/flatten/flatten_test.go new file mode 100644 index 0000000..f84b8d9 --- /dev/null +++ b/pkg/flatten/flatten_test.go @@ -0,0 +1,45 @@ +package flatten + +import ( + "github.com/icinga/icingadb/pkg/types" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestFlatten(t *testing.T) { + for _, st := range []struct { + name string + prefix string + value any + output map[string]types.String + }{ + {"nil", "a", nil, map[string]types.String{"a": types.MakeString("null")}}, + {"bool", "b", true, map[string]types.String{"b": types.MakeString("true")}}, + {"int", "c", 42, map[string]types.String{"c": types.MakeString("42")}}, + {"float", "d", 77.7, map[string]types.String{"d": types.MakeString("77.7")}}, + {"large_float", "e", 1e23, map[string]types.String{"e": types.MakeString("100000000000000000000000")}}, + {"string", "f", "\x00", map[string]types.String{"f": types.MakeString("\x00")}}, + {"nil_slice", "g", []any(nil), map[string]types.String{"g": {}}}, + {"empty_slice", "h", []any{}, map[string]types.String{"h": {}}}, + {"slice", "i", []any{nil}, map[string]types.String{"i[0]": types.MakeString("null")}}, + {"nil_map", "j", map[string]any(nil), map[string]types.String{"j": {}}}, + {"empty_map", "k", map[string]any{}, map[string]types.String{"k": {}}}, + {"map", "l", map[string]any{" ": nil}, map[string]types.String{"l. ": types.MakeString("null")}}, + {"map_with_slice", "m", map[string]any{"\t": []any{"ä", "ö", "ü"}, "ß": "s"}, map[string]types.String{ + "m.\t[0]": types.MakeString("ä"), + "m.\t[1]": types.MakeString("ö"), + "m.\t[2]": types.MakeString("ü"), + "m.ß": types.MakeString("s"), + }}, + {"slice_with_map", "n", []any{map[string]any{"ä": "a", "ö": "o", "ü": "u"}, "ß"}, map[string]types.String{ + "n[0].ä": types.MakeString("a"), + "n[0].ö": types.MakeString("o"), + "n[0].ü": types.MakeString("u"), + "n[1]": types.MakeString("ß"), + }}, + } { + t.Run(st.name, func(t *testing.T) { + assert.Equal(t, st.output, Flatten(st.value, st.prefix)) + }) + } +} diff --git a/pkg/icingadb/cleanup.go b/pkg/icingadb/cleanup.go index e57eafa..22bf02d 100644 --- a/pkg/icingadb/cleanup.go +++ b/pkg/icingadb/cleanup.go @@ -4,8 +4,9 @@ import ( "context" "fmt" "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" - "github.com/icinga/icingadb/pkg/driver" + "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/types" "time" ) @@ -20,10 +21,10 @@ type CleanupStmt struct { // Build assembles the cleanup statement for the specified database driver with the given limit. func (stmt *CleanupStmt) Build(driverName string, limit uint64) string { switch driverName { - case driver.MySQL, "mysql": + case MySQL: return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit) - case driver.PostgreSQL, "postgres": + case PostgreSQL: return fmt.Sprintf(`WITH rows AS ( SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d ) @@ -41,32 +42,46 @@ func (db *DB) CleanupOlderThan( count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}], ) (uint64, error) { var counter com.Counter - defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop() + + q := db.Rebind(stmt.Build(db.DriverName(), count)) + + defer db.log(ctx, q, &counter).Stop() for { - q := db.Rebind(stmt.Build(db.DriverName(), count)) - rs, err := db.NamedExecContext(ctx, q, cleanupWhere{ - EnvironmentId: envId, - Time: types.UnixMilli(olderThan), - }) - if err != nil { - return 0, internal.CantPerformQuery(err, q) - } + var rowsDeleted int64 + + err := retry.WithBackoff( + ctx, + func(ctx context.Context) error { + rs, err := db.NamedExecContext(ctx, q, cleanupWhere{ + EnvironmentId: envId, + Time: types.UnixMilli(olderThan), + }) + if err != nil { + return internal.CantPerformQuery(err, q) + } + + rowsDeleted, err = rs.RowsAffected() - n, err := rs.RowsAffected() + return err + }, + retry.Retryable, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + db.getDefaultRetrySettings(), + ) if err != nil { return 0, err } - counter.Add(uint64(n)) + counter.Add(uint64(rowsDeleted)) for _, onSuccess := range onSuccess { - if err := onSuccess(ctx, make([]struct{}, n)); err != nil { + if err := onSuccess(ctx, make([]struct{}, rowsDeleted)); err != nil { return 0, err } } - if n < int64(count) { + if rowsDeleted < int64(count) { break } } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 4ff3e0d..47940af 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -7,13 +7,13 @@ import ( "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" - "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/pkg/errors" + "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "reflect" @@ -54,6 +54,12 @@ type Options struct { // MaxRowsPerTransaction defines the maximum number of rows per transaction. // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"` + + // WsrepSyncWait enforces Galera cluster nodes to perform strict cluster-wide causality checks + // before executing specific SQL queries determined by the number you provided. + // Please refer to the below link for a detailed description. + // https://icinga.com/docs/icinga-db/latest/doc/03-Configuration/#galera-cluster + WsrepSyncWait int `yaml:"wsrep_sync_wait" default:"7"` } // Validate checks constraints in the supplied database options and returns an error if they are violated. @@ -70,6 +76,9 @@ func (o *Options) Validate() error { if o.MaxRowsPerTransaction < 1 { return errors.New("max_rows_per_transaction must be at least 1") } + if o.WsrepSyncWait < 0 || o.WsrepSyncWait > 15 { + return errors.New("wsrep_sync_wait can only be set to a number between 0 and 15") + } return nil } @@ -85,23 +94,35 @@ func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB { } const ( - expectedMysqlSchemaVersion = 4 - expectedPostgresSchemaVersion = 2 + expectedMysqlSchemaVersion = 5 + expectedPostgresSchemaVersion = 3 ) // CheckSchema asserts the database schema of the expected version being present. func (db *DB) CheckSchema(ctx context.Context) error { var expectedDbSchemaVersion uint16 switch db.DriverName() { - case driver.MySQL: + case MySQL: expectedDbSchemaVersion = expectedMysqlSchemaVersion - case driver.PostgreSQL: + case PostgreSQL: expectedDbSchemaVersion = expectedPostgresSchemaVersion } var version uint16 - err := db.QueryRowxContext(ctx, "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1").Scan(&version) + err := retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { + query := "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1" + err = db.QueryRowxContext(ctx, query).Scan(&version) + if err != nil { + err = internal.CantPerformQuery(err, query) + } + return + }, + retry.Retryable, + backoff.NewExponentialWithJitter(128*time.Millisecond, 1*time.Minute), + db.getDefaultRetrySettings()) if err != nil { return errors.Wrap(err, "can't check database schema version") } @@ -161,10 +182,10 @@ func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int) { var clause string switch db.DriverName() { - case driver.MySQL: + case MySQL: // MySQL treats UPDATE id = id as a no-op. clause = fmt.Sprintf(`ON DUPLICATE KEY UPDATE "%s" = "%s"`, columns[0], columns[0]) - case driver.PostgreSQL: + case PostgreSQL: clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO NOTHING", table) } @@ -224,10 +245,10 @@ func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders in var clause, setFormat string switch db.DriverName() { - case driver.MySQL: + case MySQL: clause = "ON DUPLICATE KEY UPDATE" setFormat = `"%[1]s" = VALUES("%[1]s")` - case driver.PostgreSQL: + case PostgreSQL: clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO UPDATE SET", table) setFormat = `"%[1]s" = EXCLUDED."%[1]s"` } @@ -338,7 +359,7 @@ func (db *DB) BulkExec( }, retry.Retryable, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), - retry.Settings{}, + db.getDefaultRetrySettings(), ) } }(b)) @@ -403,7 +424,7 @@ func (db *DB) NamedBulkExec( }, retry.Retryable, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), - retry.Settings{}, + db.getDefaultRetrySettings(), ) } }(b)) @@ -476,7 +497,7 @@ func (db *DB) NamedBulkExecTx( }, retry.Retryable, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), - retry.Settings{}, + db.getDefaultRetrySettings(), ) } }(b)) @@ -662,6 +683,25 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { } } +func (db *DB) getDefaultRetrySettings() retry.Settings { + return retry.Settings{ + Timeout: retry.DefaultTimeout, + OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) { + if lastErr == nil || err.Error() != lastErr.Error() { + db.logger.Warnw("Can't execute query. Retrying", zap.Error(err)) + } + }, + OnSuccess: func(elapsed time.Duration, attempt uint64, lastErr error) { + if attempt > 1 { + db.logger.Infow("Query retried successfully after error", + zap.Duration("after", elapsed), + zap.Uint64("attempts", attempt), + zap.NamedError("recovered_error", lastErr)) + } + }, + } +} + func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper { return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) { if count := counter.Reset(); count > 0 { diff --git a/pkg/icingadb/driver.go b/pkg/icingadb/driver.go new file mode 100644 index 0000000..d564916 --- /dev/null +++ b/pkg/icingadb/driver.go @@ -0,0 +1,90 @@ +package icingadb + +import ( + "context" + "database/sql/driver" + "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/retry" + "github.com/pkg/errors" + "go.uber.org/zap" + "time" +) + +// Driver names as automatically registered in the database/sql package by themselves. +const ( + MySQL string = "mysql" + PostgreSQL string = "postgres" +) + +type InitConnFunc func(context.Context, driver.Conn) error + +// RetryConnector wraps driver.Connector with retry logic. +type RetryConnector struct { + driver.Connector + + logger *logging.Logger + + // initConn can be used to execute post Connect() arbitrary actions. + // It will be called after successfully initiated a new connection using the connector's Connect method. + initConn InitConnFunc +} + +// NewConnector creates a fully initialized RetryConnector from the given args. +func NewConnector(c driver.Connector, logger *logging.Logger, init InitConnFunc) *RetryConnector { + return &RetryConnector{Connector: c, logger: logger, initConn: init} +} + +// Connect implements part of the driver.Connector interface. +func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { + var conn driver.Conn + err := errors.Wrap(retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { + conn, err = c.Connector.Connect(ctx) + if err == nil && c.initConn != nil { + if err = c.initConn(ctx, conn); err != nil { + // We're going to retry this, so just don't bother whether Close() fails! + _ = conn.Close() + } + } + + return + }, + retry.Retryable, + backoff.NewExponentialWithJitter(128*time.Millisecond, 1*time.Minute), + retry.Settings{ + Timeout: retry.DefaultTimeout, + OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) { + telemetry.UpdateCurrentDbConnErr(err) + + if lastErr == nil || err.Error() != lastErr.Error() { + c.logger.Warnw("Can't connect to database. Retrying", zap.Error(err)) + } + }, + OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { + telemetry.UpdateCurrentDbConnErr(nil) + + if attempt > 1 { + c.logger.Infow("Reconnected to database", + zap.Duration("after", elapsed), zap.Uint64("attempts", attempt)) + } + }, + }, + ), "can't connect to database") + return conn, err +} + +// Driver implements part of the driver.Connector interface. +func (c RetryConnector) Driver() driver.Driver { + return c.Connector.Driver() +} + +// MysqlFuncLogger is an adapter that allows ordinary functions to be used as a logger for mysql.SetLogger. +type MysqlFuncLogger func(v ...interface{}) + +// Print implements the mysql.Logger interface. +func (log MysqlFuncLogger) Print(v ...interface{}) { + log(v) +} diff --git a/pkg/icingadb/dump_signals.go b/pkg/icingadb/dump_signals.go index bce1aef..2f8b46e 100644 --- a/pkg/icingadb/dump_signals.go +++ b/pkg/icingadb/dump_signals.go @@ -2,10 +2,10 @@ package icingadb import ( "context" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/logging" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "sync" ) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 74d3b32..cc32a4b 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -9,7 +9,6 @@ import ( "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" - "github.com/icinga/icingadb/pkg/driver" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" @@ -23,7 +22,10 @@ import ( "time" ) -var timeout = 60 * time.Second +// peerTimeout defines the timeout for HA heartbeats, being used to detect absent nodes. +// +// Because this timeout relies on icingaredis.Timeout, it is icingaredis.Timeout plus a short grace period. +const peerTimeout = icingaredis.Timeout + 5*time.Second type haState struct { responsibleTsMilli int64 @@ -43,8 +45,8 @@ type HA struct { heartbeat *icingaredis.Heartbeat logger *logging.Logger responsible bool - handover chan struct{} - takeover chan struct{} + handover chan string + takeover chan string done chan struct{} errOnce sync.Once errMu sync.Mutex @@ -64,8 +66,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger db: db, heartbeat: heartbeat, logger: logger, - handover: make(chan struct{}), - takeover: make(chan struct{}), + handover: make(chan string), + takeover: make(chan string), done: make(chan struct{}), } @@ -107,13 +109,13 @@ func (h *HA) Err() error { return h.err } -// Handover returns a channel with which handovers are signaled. -func (h *HA) Handover() chan struct{} { +// Handover returns a channel with which handovers and their reasons are signaled. +func (h *HA) Handover() chan string { return h.handover } -// Takeover returns a channel with which takeovers are signaled. -func (h *HA) Takeover() chan struct{} { +// Takeover returns a channel with which takeovers and their reasons are signaled. +func (h *HA) Takeover() chan string { return h.takeover } @@ -141,12 +143,24 @@ func (h *HA) controller() { oldInstancesRemoved := false - logTicker := time.NewTicker(time.Second * 60) - defer logTicker.Stop() - shouldLog := true + // Suppress recurring log messages in the realize method to be only logged this often. + routineLogTicker := time.NewTicker(5 * time.Minute) + defer routineLogTicker.Stop() + shouldLogRoutineEvents := true + + // The retry logic in HA is twofold: + // + // 1) Updating or inserting the instance row based on the current heartbeat must be done within the heartbeat's + // expiration time. Therefore, we use a deadline ctx to retry.WithBackoff() in realize() which expires earlier + // than our default timeout. + // 2) Since we do not want to exit before our default timeout expires, we have to repeat step 1 until it does. + retryTimeout := time.NewTimer(retry.DefaultTimeout) + defer retryTimeout.Stop() for { select { + case <-retryTimeout.C: + h.abort(errors.New("retry deadline exceeded")) case m := <-h.heartbeat.Events(): if m != nil { now := time.Now() @@ -158,10 +172,15 @@ func (h *HA) controller() { if tt.After(now.Add(1 * time.Second)) { h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) } - if tt.Before(now.Add(-1 * timeout)) { + if tt.Before(now.Add(-1 * peerTimeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) - h.signalHandover() + + h.signalHandover("received heartbeat from the past") h.realizeLostHeartbeat() + + // Reset retry timeout so that the next iterations have the full amount of time available again. + retry.ResetTimeout(retryTimeout, retry.DefaultTimeout) + continue } s, err := m.Stats().IcingaStatus() @@ -186,33 +205,28 @@ func (h *HA) controller() { EntityWithoutChecksum: v1.EntityWithoutChecksum{IdMeta: v1.IdMeta{ Id: envId, }}, - Name: types.String{ - NullString: sql.NullString{ - String: envId.String(), - Valid: true, - }, - }, + Name: types.MakeString(envId.String()), } h.environmentMu.Unlock() } select { - case <-logTicker.C: - shouldLog = true + case <-routineLogTicker.C: + shouldLogRoutineEvents = true default: } - var realizeCtx context.Context - var cancelRealizeCtx context.CancelFunc - if h.responsible { - realizeCtx, cancelRealizeCtx = context.WithDeadline(h.ctx, m.ExpiryTime()) - } else { - realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx) - } - err = h.realize(realizeCtx, s, t, envId, shouldLog) + // Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time. + realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime()) + err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { - h.signalHandover() + h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time") + + // Instance insert/update was not completed by the expiration time of the current heartbeat. + // Pass control back to the loop to try again with the next heartbeat, + // or exit the loop when the retry timeout has expired. Therefore, + // retry timeout is **not** reset here so that retries continue until the timeout has expired. continue } if err != nil { @@ -224,12 +238,20 @@ func (h *HA) controller() { oldInstancesRemoved = true } - shouldLog = false + shouldLogRoutineEvents = false } else { h.logger.Error("Lost heartbeat") - h.signalHandover() + h.signalHandover("lost heartbeat") h.realizeLostHeartbeat() } + + // Reset retry timeout so that the next iterations have the full amount of time available again. + // Don't be surprised by the location of the code, + // as it is obvious that the timer is also reset after an error that ends the loop anyway. + // But this is the best place to catch all scenarios where the timeout needs to be reset. + // And since HA needs quite a bit of refactoring anyway to e.g. return immediately after calling h.abort(), + // it's fine to have it here for now. + retry.ResetTimeout(retryTimeout, retry.DefaultTimeout) case <-h.heartbeat.Done(): if err := h.heartbeat.Err(); err != nil { h.abort(err) @@ -240,18 +262,34 @@ func (h *HA) controller() { } } -func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error { - var takeover, otherResponsible bool +// realize a HA cycle triggered by a heartbeat event. +// +// shouldLogRoutineEvents indicates if recurrent events should be logged. +func (h *HA) realize( + ctx context.Context, + s *icingaredisv1.IcingaStatus, + t *types.UnixMilli, + envId types.Binary, + shouldLogRoutineEvents bool, +) error { + var ( + takeover string + otherResponsible bool + ) + + if _, ok := ctx.Deadline(); !ok { + panic("can't use context w/o deadline in realize()") + } err := retry.WithBackoff( ctx, func(ctx context.Context) error { - takeover = false + takeover = "" otherResponsible = false isoLvl := sql.LevelSerializable selectLock := "" - if h.db.DriverName() == driver.MySQL { + if h.db.DriverName() == MySQL { // The RDBMS may actually be a Percona XtraDB Cluster which doesn't // support serializable transactions, but only their following equivalent: isoLvl = sql.LevelRepeatableRead @@ -264,25 +302,41 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type } query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+ - "WHERE environment_id = ? AND responsible = ? AND id <> ? AND heartbeat > ?") + selectLock + "WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock instance := &v1.IcingadbInstance{} + errQuery := tx.QueryRowxContext(ctx, query, envId, "y", h.instanceId).StructScan(instance) + + switch { + case errQuery == nil: + fields := []any{ + zap.String("instance_id", instance.Id.String()), + zap.String("environment", envId.String()), + zap.Time("heartbeat", instance.Heartbeat.Time()), + zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())), + } - errQuery := tx.QueryRowxContext( - ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(), - ).StructScan(instance) - switch errQuery { - case nil: - otherResponsible = true - if shouldLog { - h.logger.Infow("Another instance is active", - zap.String("instance_id", instance.Id.String()), - zap.String("environment", envId.String()), - "heartbeat", instance.Heartbeat, - zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) + if instance.Heartbeat.Time().Before(time.Now().Add(-1 * peerTimeout)) { + takeover = "other instance's heartbeat has expired" + h.logger.Debugw("Preparing to take over HA as other instance's heartbeat has expired", fields...) + } else { + otherResponsible = true + if shouldLogRoutineEvents { + h.logger.Infow("Another instance is active", fields...) + } + } + + case errors.Is(errQuery, sql.ErrNoRows): + fields := []any{ + zap.String("instance_id", h.instanceId.String()), + zap.String("environment", envId.String())} + if !h.responsible { + takeover = "no other instance is active" + h.logger.Debugw("Preparing to take over HA as no instance is active", fields...) + } else if h.responsible && shouldLogRoutineEvents { + h.logger.Debugw("Continuing being the active instance", fields...) } - case sql.ErrNoRows: - takeover = true + default: return internal.CantPerformQuery(errQuery, query) } @@ -297,7 +351,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type EnvironmentId: envId, }, Heartbeat: *t, - Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true}, + Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true}, EndpointId: s.EndpointId, Icinga2Version: s.Version, Icinga2StartTime: s.ProgramStart, @@ -314,7 +368,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return internal.CantPerformQuery(err, stmt) } - if takeover { + if takeover != "" { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?") _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId) @@ -330,16 +384,33 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return nil }, retry.Retryable, - backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3), + backoff.NewExponentialWithJitter(256*time.Millisecond, 3*time.Second), retry.Settings{ - OnError: func(_ time.Duration, attempt uint64, err, lastErr error) { + // Intentionally no timeout is set, as we use a context with a deadline. + OnRetryableError: func(_ time.Duration, attempt uint64, err, lastErr error) { if lastErr == nil || err.Error() != lastErr.Error() { log := h.logger.Debugw - if attempt > 2 { + if attempt > 3 { + log = h.logger.Infow + } + + log("Can't update or insert instance. Retrying", zap.Error(err)) + } + }, + OnSuccess: func(elapsed time.Duration, attempt uint64, lastErr error) { + if attempt > 1 { + log := h.logger.Debugw + + if attempt > 4 { + // We log errors with severity info starting from the fourth attempt, (see above) + // so we need to log success with severity info from the fifth attempt. log = h.logger.Infow } - log("Can't update or insert instance. Retrying", zap.Error(err), zap.Uint64("retry count", attempt)) + log("Instance updated/inserted successfully after error", + zap.Duration("after", elapsed), + zap.Uint64("attempts", attempt), + zap.NamedError("recovered_error", lastErr)) } }, }, @@ -348,14 +419,14 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return err } - if takeover { + if takeover != "" { // Insert the environment after each heartbeat takeover if it does not already exist in the database // as the environment may have changed, although this is likely to happen very rarely. if err := h.insertEnvironment(); err != nil { return errors.Wrap(err, "can't insert environment") } - h.signalTakeover() + h.signalTakeover(takeover) } else if otherResponsible { if state, _ := h.state.Load(); !state.otherResponsible { state.otherResponsible = true @@ -366,6 +437,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return nil } +// realizeLostHeartbeat updates "responsible = n" for this HA into the database. func (h *HA) realizeLostHeartbeat() { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?") if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) { @@ -399,10 +471,10 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar select { case <-h.ctx.Done(): return - case <-time.After(timeout): + case <-time.After(peerTimeout): query := h.db.Rebind("DELETE FROM icingadb_instance " + "WHERE id <> ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?") - heartbeat := types.UnixMilli(time.Now().Add(-timeout)) + heartbeat := types.UnixMilli(time.Now().Add(-1 * peerTimeout)) result, err := h.db.ExecContext(h.ctx, query, h.instanceId, envId, s.EndpointId, heartbeat) if err != nil { @@ -421,7 +493,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar } } -func (h *HA) signalHandover() { +// signalHandover gives up HA.responsible and notifies the HA.Handover chan. +func (h *HA) signalHandover(reason string) { if h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -430,7 +503,7 @@ func (h *HA) signalHandover() { }) select { - case h.handover <- struct{}{}: + case h.handover <- reason: h.responsible = false case <-h.ctx.Done(): // Noop @@ -438,7 +511,8 @@ func (h *HA) signalHandover() { } } -func (h *HA) signalTakeover() { +// signalTakeover claims HA.responsible and notifies the HA.Takeover chan. +func (h *HA) signalTakeover(reason string) { if !h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -447,7 +521,7 @@ func (h *HA) signalTakeover() { }) select { - case h.takeover <- struct{}{}: + case h.takeover <- reason: h.responsible = true case <-h.ctx.Done(): // Noop diff --git a/pkg/icingadb/history/sla.go b/pkg/icingadb/history/sla.go index 79d22c7..7c0849e 100644 --- a/pkg/icingadb/history/sla.go +++ b/pkg/icingadb/history/sla.go @@ -1,10 +1,10 @@ package history import ( - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/types" + "github.com/redis/go-redis/v9" "reflect" ) diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index dc8bc61..4be0e71 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -2,7 +2,6 @@ package history import ( "context" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" @@ -17,6 +16,7 @@ import ( "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" "reflect" "sync" @@ -144,7 +144,11 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi stream := "icinga:history:stream:" + key for { select { - case bulk := <-bulks: + case bulk, ok := <-bulks: + if !ok { + return nil + } + ids := make([]string, len(bulk)) for i := range bulk { ids[i] = bulk[i].ID diff --git a/pkg/icingadb/objectpacker/objectpacker.go b/pkg/icingadb/objectpacker/objectpacker.go index 9ddfdc8..0152745 100644 --- a/pkg/icingadb/objectpacker/objectpacker.go +++ b/pkg/icingadb/objectpacker/objectpacker.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/pkg/errors" "io" - "io/ioutil" "reflect" "sort" ) @@ -102,7 +101,7 @@ func packValue(in reflect.Value, out io.Writer) error { // If there aren't any values to pack, ... if l < 1 { // ... create one and pack it - panics on disallowed type. - _ = packValue(reflect.Zero(in.Type().Elem()), ioutil.Discard) + _ = packValue(reflect.Zero(in.Type().Elem()), io.Discard) } return nil @@ -140,13 +139,13 @@ func packValue(in reflect.Value, out io.Writer) error { packedKey = key.Slice(0, key.Len()).Interface().([]byte) } else { // Not just stringify the key (below), but also pack it (here) - panics on disallowed type. - _ = packValue(iter.Key(), ioutil.Discard) + _ = packValue(iter.Key(), io.Discard) packedKey = []byte(fmt.Sprint(key.Interface())) } } else { // Not just stringify the key (below), but also pack it (here) - panics on disallowed type. - _ = packValue(iter.Key(), ioutil.Discard) + _ = packValue(iter.Key(), io.Discard) packedKey = []byte(fmt.Sprint(key.Interface())) } @@ -176,8 +175,8 @@ func packValue(in reflect.Value, out io.Writer) error { typ := in.Type() // ... create one and pack it - panics on disallowed type. - _ = packValue(reflect.Zero(typ.Key()), ioutil.Discard) - _ = packValue(reflect.Zero(typ.Elem()), ioutil.Discard) + _ = packValue(reflect.Zero(typ.Key()), io.Discard) + _ = packValue(reflect.Zero(typ.Elem()), io.Discard) } return nil @@ -186,7 +185,7 @@ func packValue(in reflect.Value, out io.Writer) error { err := packValue(reflect.Value{}, out) // Create a fictive referenced value and pack it - panics on disallowed type. - _ = packValue(reflect.Zero(in.Type().Elem()), ioutil.Discard) + _ = packValue(reflect.Zero(in.Type().Elem()), io.Discard) return err } else { diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index 5cd4d67..377592a 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -4,7 +4,6 @@ import ( "context" _ "embed" "fmt" - "github.com/go-redis/redis/v8" "github.com/google/uuid" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" @@ -17,6 +16,7 @@ import ( "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" "regexp" "strconv" diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index dfee9c0..a56263a 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -3,7 +3,6 @@ package icingadb import ( "context" "fmt" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" @@ -15,6 +14,7 @@ import ( "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" diff --git a/pkg/icingadb/v1/checkable.go b/pkg/icingadb/v1/checkable.go index dbb114c..4b1efeb 100644 --- a/pkg/icingadb/v1/checkable.go +++ b/pkg/icingadb/v1/checkable.go @@ -30,7 +30,7 @@ type Checkable struct { IconImageAlt string `json:"icon_image_alt"` IconImageId types.Binary `json:"icon_image_id"` IsVolatile types.Bool `json:"is_volatile"` - MaxCheckAttempts float64 `json:"max_check_attempts"` + MaxCheckAttempts uint32 `json:"max_check_attempts"` Notes string `json:"notes"` NotesUrlId types.Binary `json:"notes_url_id"` NotificationsEnabled types.Bool `json:"notifications_enabled"` diff --git a/pkg/icingadb/v1/history/state.go b/pkg/icingadb/v1/history/state.go index dec13b0..6320b73 100644 --- a/pkg/icingadb/v1/history/state.go +++ b/pkg/icingadb/v1/history/state.go @@ -14,7 +14,7 @@ type StateHistory struct { HardState uint8 `json:"hard_state"` PreviousSoftState uint8 `json:"previous_soft_state"` PreviousHardState uint8 `json:"previous_hard_state"` - CheckAttempt uint8 `json:"check_attempt"` + CheckAttempt uint32 `json:"check_attempt"` Output types.String `json:"output"` LongOutput types.String `json:"long_output"` MaxCheckAttempts uint32 `json:"max_check_attempts"` diff --git a/pkg/icingadb/v1/state.go b/pkg/icingadb/v1/state.go index bad8f28..983b14d 100644 --- a/pkg/icingadb/v1/state.go +++ b/pkg/icingadb/v1/state.go @@ -9,7 +9,7 @@ type State struct { EnvironmentMeta `json:",inline"` AcknowledgementCommentId types.Binary `json:"acknowledgement_comment_id"` LastCommentId types.Binary `json:"last_comment_id"` - CheckAttempt uint8 `json:"check_attempt"` + CheckAttempt uint32 `json:"check_attempt"` CheckCommandline types.String `json:"check_commandline"` CheckSource types.String `json:"check_source"` SchedulingSource types.String `json:"scheduling_source"` diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index d42713c..c494f95 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -2,7 +2,6 @@ package icingaredis import ( "context" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" @@ -10,6 +9,7 @@ import ( "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "runtime" diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 9a8ebad..cb34010 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -2,13 +2,13 @@ package icingaredis import ( "context" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal" v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "golang.org/x/sync/errgroup" "sync" @@ -16,9 +16,9 @@ import ( "time" ) -// timeout defines how long a heartbeat may be absent if a heartbeat has already been received. +// Timeout defines how long a heartbeat may be absent if a heartbeat has already been received. // After this time, a heartbeat loss is propagated. -var timeout = 60 * time.Second +const Timeout = time.Minute // Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. // Also signals on if the heartbeat is Lost. @@ -97,7 +97,7 @@ func (h *Heartbeat) controller(ctx context.Context) { // Message producer loop. g.Go(func() error { // We expect heartbeats every second but only read them every 3 seconds. - throttle := time.NewTicker(time.Second * 3) + throttle := time.NewTicker(3 * time.Second) defer throttle.Stop() for id := "$"; ; { @@ -141,9 +141,9 @@ func (h *Heartbeat) controller(ctx context.Context) { atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) h.sendEvent(m) - case <-time.After(timeout): + case <-time.After(Timeout): if h.active { - h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout)) + h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", Timeout)) h.sendEvent(nil) h.active = false } else { @@ -217,5 +217,5 @@ func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) { // ExpiryTime returns the timestamp when the heartbeat expires. func (m *HeartbeatMessage) ExpiryTime() time.Time { - return m.received.Add(timeout) + return m.received.Add(Timeout) } diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go index ee476a1..0057ae0 100644 --- a/pkg/icingaredis/telemetry/heartbeat.go +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -3,7 +3,6 @@ package telemetry import ( "context" "fmt" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/icingaredis" @@ -11,6 +10,7 @@ import ( "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "regexp" "runtime/metrics" diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go index 86db0b3..2b592a5 100644 --- a/pkg/icingaredis/telemetry/stats.go +++ b/pkg/icingaredis/telemetry/stats.go @@ -2,12 +2,12 @@ package telemetry import ( "context" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/utils" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "strconv" "time" diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index 9176dba..50c97f9 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -2,13 +2,13 @@ package icingaredis import ( "context" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" ) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index da73943..e5b93de 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -7,12 +7,15 @@ import ( "github.com/icinga/icingadb/pkg/backoff" "github.com/lib/pq" "github.com/pkg/errors" + "io" "net" - "strings" "syscall" "time" ) +// DefaultTimeout is our opinionated default timeout for retrying database and Redis operations. +const DefaultTimeout = 5 * time.Minute + // RetryableFunc is a retryable function. type RetryableFunc func(context.Context) error @@ -21,10 +24,15 @@ type IsRetryable func(error) bool // Settings aggregates optional settings for WithBackoff. type Settings struct { - // Timeout lets WithBackoff give up once elapsed (if >0). + // If >0, Timeout lets WithBackoff stop retrying gracefully once elapsed based on the following criteria: + // * If the execution of RetryableFunc has taken longer than Timeout, no further attempts are made. + // * If Timeout elapses during the sleep phase between retries, one final retry is attempted. + // * RetryableFunc is always granted its full execution time and is not canceled if it exceeds Timeout. + // This means that WithBackoff may not stop exactly after Timeout expires, + // or may not retry at all if the first execution of RetryableFunc already takes longer than Timeout. Timeout time.Duration - // OnError is called if an error occurs. - OnError func(elapsed time.Duration, attempt uint64, err, lastErr error) + // OnRetryableError is called if a retryable error occurs. + OnRetryableError func(elapsed time.Duration, attempt uint64, err, lastErr error) // OnSuccess is called once the operation succeeds. OnSuccess func(elapsed time.Duration, attempt uint64, lastErr error) } @@ -34,16 +42,19 @@ type Settings struct { func WithBackoff( ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, settings Settings, ) (err error) { - parentCtx := ctx + // Channel for retry deadline, which is set to the channel of NewTimer() if a timeout is configured, + // otherwise nil, so that it blocks forever if there is no timeout. + var timeout <-chan time.Time if settings.Timeout > 0 { - var cancelCtx context.CancelFunc - ctx, cancelCtx = context.WithTimeout(ctx, settings.Timeout) - defer cancelCtx() + t := time.NewTimer(settings.Timeout) + defer t.Stop() + timeout = t.C } start := time.Now() - for attempt := uint64(0); ; /* true */ attempt++ { + timedOut := false + for attempt := uint64(1); ; /* true */ attempt++ { prevErr := err if err = retryableFunc(ctx); err == nil { @@ -54,43 +65,72 @@ func WithBackoff( return } - if settings.OnError != nil { - settings.OnError(time.Since(start), attempt, err, prevErr) + // Retryable function may have exited prematurely due to context errors. + // We explicitly check the context error here, as the error returned by the retryable function can pass the + // error.Is() checks even though it is not a real context error, e.g. + // https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/net/net.go;l=422 + // https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/net/net.go;l=601 + if errors.Is(ctx.Err(), context.DeadlineExceeded) || errors.Is(ctx.Err(), context.Canceled) { + if prevErr != nil { + err = errors.Wrap(err, prevErr.Error()) + } + + return } - isRetryable := retryable(err) + if !retryable(err) { + err = errors.Wrap(err, "can't retry") - if prevErr != nil && (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { - err = prevErr + return } - if !isRetryable { - err = errors.Wrap(err, "can't retry") + select { + case <-timeout: + // Stop retrying immediately if executing the retryable function took longer than the timeout. + timedOut = true + default: + } + + if timedOut { + err = errors.Wrap(err, "retry deadline exceeded") return } - sleep := b(attempt) + if settings.OnRetryableError != nil { + settings.OnRetryableError(time.Since(start), attempt, err, prevErr) + } + select { + case <-time.After(b(attempt)): + case <-timeout: + // Do not stop retrying immediately, but start one last attempt to mitigate timing issues where + // the timeout expires while waiting for the next attempt and + // therefore no retries have happened during this possibly long period. + timedOut = true case <-ctx.Done(): - if outerErr := parentCtx.Err(); outerErr != nil { - err = errors.Wrap(outerErr, "outer context canceled") - } else { - if err == nil { - err = ctx.Err() - } - err = errors.Wrap(err, "can't retry") - } + err = errors.Wrap(ctx.Err(), err.Error()) return - case <-time.After(sleep): } } } +// ResetTimeout changes the possibly expired timer t to expire after duration d. +// +// If the timer has already expired and nothing has been received from its channel, +// it is automatically drained as if the timer had never expired. +func ResetTimeout(t *time.Timer, d time.Duration) { + if !t.Stop() { + <-t.C + } + + t.Reset(d) +} + // Retryable returns true for common errors that are considered retryable, // i.e. temporary, timeout, DNS, connection refused and reset, host down and unreachable and -// network down and unreachable errors. +// network down and unreachable errors. In addition, any database error is considered retryable. func Retryable(err error) bool { var temporary interface { Temporary() bool @@ -133,6 +173,12 @@ func Retryable(err error) bool { if errors.Is(err, syscall.ENETDOWN) || errors.Is(err, syscall.ENETUNREACH) { return true } + if errors.Is(err, syscall.EPIPE) { + return true + } + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } if errors.Is(err, driver.ErrBadConn) { return true @@ -141,43 +187,10 @@ func Retryable(err error) bool { return true } - var e *mysql.MySQLError - if errors.As(err, &e) { - switch e.Number { - case 1053, 1205, 1213, 2006: - // 1053: Server shutdown in progress - // 1205: Lock wait timeout - // 1213: Deadlock found when trying to get lock - // 2006: MySQL server has gone away - return true - default: - return false - } - } - - var pe *pq.Error - if errors.As(err, &pe) { - switch pe.Code { - case "08000", // connection_exception - "08006", // connection_failure - "08001", // sqlclient_unable_to_establish_sqlconnection - "08004", // sqlserver_rejected_establishment_of_sqlconnection - "40001", // serialization_failure - "40P01", // deadlock_detected - "54000", // program_limit_exceeded - "55006", // object_in_use - "55P03", // lock_not_available - "57P01", // admin_shutdown - "57P02", // crash_shutdown - "57P03", // cannot_connect_now - "58000", // system_error - "58030", // io_error - "XX000": // internal_error - return true - default: - // Class 53 - Insufficient Resources - return strings.HasPrefix(string(pe.Code), "53") - } + var mye *mysql.MySQLError + var pqe *pq.Error + if errors.As(err, &mye) || errors.As(err, &pqe) { + return true } return false diff --git a/pkg/types/string.go b/pkg/types/string.go index f8ead45..ce2a4ac 100644 --- a/pkg/types/string.go +++ b/pkg/types/string.go @@ -15,6 +15,14 @@ type String struct { sql.NullString } +// MakeString constructs a new non-NULL String from s. +func MakeString(s string) String { + return String{sql.NullString{ + String: s, + Valid: true, + }} +} + // MarshalJSON implements the json.Marshaler interface. // Supports JSON null. func (s String) MarshalJSON() ([]byte, error) { |