diff options
Diffstat (limited to 'pkg/config')
-rw-r--r-- | pkg/config/config.go | 24 | ||||
-rw-r--r-- | pkg/config/config_test.go | 50 | ||||
-rw-r--r-- | pkg/config/database.go | 75 | ||||
-rw-r--r-- | pkg/config/redis.go | 10 |
4 files changed, 90 insertions, 69 deletions
diff --git a/pkg/config/config.go b/pkg/config/config.go index a683014..744f4c3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,15 +1,12 @@ package config import ( - "bytes" "crypto/tls" "crypto/x509" - "fmt" "github.com/creasty/defaults" "github.com/goccy/go-yaml" "github.com/jessevdk/go-flags" "github.com/pkg/errors" - "io/ioutil" "os" ) @@ -19,8 +16,6 @@ type Config struct { Redis Redis `yaml:"redis"` Logging Logging `yaml:"logging"` Retention Retention `yaml:"retention"` - - DecodeWarning error `yaml:"-"` } // Validate checks constraints in the supplied configuration and returns an error if they are violated. @@ -51,13 +46,14 @@ type Flags struct { // FromYAMLFile returns a new Config value created from the given YAML config file. func FromYAMLFile(name string) (*Config, error) { - f, err := os.ReadFile(name) + f, err := os.Open(name) if err != nil { - return nil, errors.Wrap(err, "can't read YAML file "+name) + return nil, errors.Wrap(err, "can't open YAML file "+name) } + defer f.Close() c := &Config{} - d := yaml.NewDecoder(bytes.NewReader(f)) + d := yaml.NewDecoder(f, yaml.DisallowUnknownField()) if err := defaults.Set(c); err != nil { return nil, errors.Wrap(err, "can't set config defaults") @@ -67,16 +63,8 @@ func FromYAMLFile(name string) (*Config, error) { return nil, errors.Wrap(err, "can't parse YAML file "+name) } - // Decode again with yaml.DisallowUnknownField() (like v1.2 will do) and issue a warning if it returns an error. - c.DecodeWarning = yaml.NewDecoder(bytes.NewReader(f), yaml.DisallowUnknownField()).Decode(&Config{}) - if err := c.Validate(); err != nil { - const msg = "invalid configuration" - if warn := c.DecodeWarning; warn != nil { - return nil, fmt.Errorf("%s: %w\n\nwarning: ignored unknown config option:\n\n%v", msg, err, warn) - } else { - return nil, errors.Wrap(err, msg) - } + return nil, errors.Wrap(err, "invalid configuration") } return c, nil @@ -129,7 +117,7 @@ func (t *TLS) MakeConfig(serverName string) (*tls.Config, error) { if t.Insecure { tlsConfig.InsecureSkipVerify = true } else if t.Ca != "" { - raw, err := ioutil.ReadFile(t.Ca) + raw, err := os.ReadFile(t.Ca) if err != nil { return nil, errors.Wrap(err, "can't read CA file") } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 2418094..94e3773 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -20,34 +20,33 @@ redis: host: 2001:db8::1 ` - miniOutput := &Config{} - _ = defaults.Set(miniOutput) - - miniOutput.Database.Host = "192.0.2.1" - miniOutput.Database.Database = "icingadb" - miniOutput.Database.User = "icingadb" - miniOutput.Database.Password = "icingadb" - - miniOutput.Redis.Host = "2001:db8::1" - miniOutput.Logging.Output = logging.CONSOLE - subtests := []struct { name string input string output *Config - warn bool }{ { - name: "mini", - input: miniConf, - output: miniOutput, - warn: false, + name: "mini", + input: miniConf, + output: func() *Config { + c := &Config{} + _ = defaults.Set(c) + + c.Database.Host = "192.0.2.1" + c.Database.Database = "icingadb" + c.Database.User = "icingadb" + c.Database.Password = "icingadb" + + c.Redis.Host = "2001:db8::1" + c.Logging.Output = logging.CONSOLE + + return c + }(), }, { name: "mini-with-unknown", input: miniConf + "\nunknown: 42", - output: miniOutput, - warn: true, + output: nil, }, } @@ -59,19 +58,12 @@ redis: require.NoError(t, os.WriteFile(tempFile.Name(), []byte(st.input), 0o600)) - actual, err := FromYAMLFile(tempFile.Name()) - require.NoError(t, err) - - if st.warn { - require.Error(t, actual.DecodeWarning, "reading config should produce a warning") - - // Reset the warning so that the following require.Equal() doesn't try to compare it. - actual.DecodeWarning = nil + if actual, err := FromYAMLFile(tempFile.Name()); st.output == nil { + require.Error(t, err) } else { - require.NoError(t, actual.DecodeWarning, "reading config should not produce a warning") + require.NoError(t, err) + require.Equal(t, st.output, actual) } - - require.Equal(t, st.output, actual) }) } } diff --git a/pkg/config/database.go b/pkg/config/database.go index b42ff8e..0895d26 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -1,25 +1,25 @@ package config import ( + "context" + "database/sql" + "database/sql/driver" "fmt" "github.com/go-sql-driver/mysql" - "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" + "github.com/lib/pq" "github.com/pkg/errors" "net" "net/url" "strconv" "strings" - "sync" "time" ) -var registerDriverOnce sync.Once - // Database defines database client configuration. type Database struct { Type string `yaml:"type" default:"mysql"` @@ -35,17 +35,14 @@ type Database struct { // Open prepares the DSN string and driver configuration, // calls sqlx.Open, but returns *icingadb.DB. func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { - registerDriverOnce.Do(func() { - driver.Register(logger) - }) - - var dsn string + var db *sqlx.DB switch d.Type { case "mysql": config := mysql.NewConfig() config.User = d.User config.Passwd = d.Password + config.Logger = icingadb.MysqlFuncLogger(logger.Debug) if d.isUnixAddr() { config.Net = "unix" @@ -61,7 +58,7 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { config.DBName = d.Database config.Timeout = time.Minute - config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"} + config.Params = map[string]string{"sql_mode": "'TRADITIONAL,ANSI_QUOTES'"} tlsConfig, err := d.TlsOptions.MakeConfig(d.Host) if err != nil { @@ -75,7 +72,17 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { } } - dsn = config.FormatDSN() + c, err := mysql.NewConnector(config) + if err != nil { + return nil, errors.Wrap(err, "can't open mysql database") + } + + wsrepSyncWait := int64(d.Options.WsrepSyncWait) + setWsrepSyncWait := func(ctx context.Context, conn driver.Conn) error { + return setGaleraOpts(ctx, conn, wsrepSyncWait) + } + + db = sqlx.NewDb(sql.OpenDB(icingadb.NewConnector(c, logger, setWsrepSyncWait)), icingadb.MySQL) case "pgsql": uri := &url.URL{ Scheme: "postgres", @@ -123,16 +130,17 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { } uri.RawQuery = query.Encode() - dsn = uri.String() + + connector, err := pq.NewConnector(uri.String()) + if err != nil { + return nil, errors.Wrap(err, "can't open pgsql database") + } + + db = sqlx.NewDb(sql.OpenDB(icingadb.NewConnector(connector, logger, nil)), icingadb.PostgreSQL) default: return nil, unknownDbType(d.Type) } - db, err := sqlx.Open("icingadb-"+d.Type, dsn) - if err != nil { - return nil, errors.Wrap(err, "can't open database") - } - db.SetMaxIdleConns(d.Options.MaxConnections / 3) db.SetMaxOpenConns(d.Options.MaxConnections) @@ -173,3 +181,36 @@ func (d *Database) isUnixAddr() bool { func unknownDbType(t string) error { return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t) } + +// setGaleraOpts sets the "wsrep_sync_wait" variable for each session ensures that causality checks are performed +// before execution and that each statement is executed on a fully synchronized node. Doing so prevents foreign key +// violation when inserting into dependent tables on different MariaDB/MySQL nodes. When using MySQL single nodes, +// the "SET SESSION" command will fail with "Unknown system variable (1193)" and will therefore be silently dropped. +// +// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait +func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error { + const galeraOpts = "SET SESSION wsrep_sync_wait=?" + + stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts) + if err != nil { + if errors.Is(err, &mysql.MySQLError{Number: 1193}) { // Unknown system variable + return nil + } + + return errors.Wrap(err, "cannot prepare "+galeraOpts) + } + // This is just for an unexpected exit and any returned error can safely be ignored and in case + // of the normal function exit, the stmt is closed manually, and its error is handled gracefully. + defer func() { _ = stmt.Close() }() + + _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}}) + if err != nil { + return errors.Wrap(err, "cannot execute "+galeraOpts) + } + + if err = stmt.Close(); err != nil { + return errors.Wrap(err, "cannot close prepared statement "+galeraOpts) + } + + return nil +} diff --git a/pkg/config/redis.go b/pkg/config/redis.go index 38571e3..ad8b31a 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -4,13 +4,13 @@ import ( "context" "crypto/tls" "fmt" - "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "net" "strings" @@ -85,16 +85,16 @@ func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc retry.Retryable, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), retry.Settings{ - Timeout: 5 * time.Minute, - OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + Timeout: retry.DefaultTimeout, + OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) { if lastErr == nil || err.Error() != lastErr.Error() { logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err)) } }, OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { - if attempt > 0 { + if attempt > 1 { logger.Infow("Reconnected to Redis", - zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) + zap.Duration("after", elapsed), zap.Uint64("attempts", attempt)) } }, }, |