summaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/backoff/backoff.go4
-rw-r--r--pkg/config/config.go24
-rw-r--r--pkg/config/config_test.go50
-rw-r--r--pkg/config/database.go75
-rw-r--r--pkg/config/redis.go10
-rw-r--r--pkg/driver/driver.go114
-rw-r--r--pkg/driver/pgsql.go22
-rw-r--r--pkg/flatten/flatten.go11
-rw-r--r--pkg/flatten/flatten_test.go45
-rw-r--r--pkg/icingadb/cleanup.go47
-rw-r--r--pkg/icingadb/db.go66
-rw-r--r--pkg/icingadb/driver.go90
-rw-r--r--pkg/icingadb/dump_signals.go2
-rw-r--r--pkg/icingadb/ha.go208
-rw-r--r--pkg/icingadb/history/sla.go2
-rw-r--r--pkg/icingadb/history/sync.go8
-rw-r--r--pkg/icingadb/objectpacker/objectpacker.go13
-rw-r--r--pkg/icingadb/overdue/sync.go2
-rw-r--r--pkg/icingadb/runtime_updates.go2
-rw-r--r--pkg/icingadb/v1/checkable.go2
-rw-r--r--pkg/icingadb/v1/history/state.go2
-rw-r--r--pkg/icingadb/v1/state.go2
-rw-r--r--pkg/icingaredis/client.go2
-rw-r--r--pkg/icingaredis/heartbeat.go14
-rw-r--r--pkg/icingaredis/telemetry/heartbeat.go2
-rw-r--r--pkg/icingaredis/telemetry/stats.go2
-rw-r--r--pkg/icingaredis/utils.go2
-rw-r--r--pkg/retry/retry.go141
-rw-r--r--pkg/types/string.go8
29 files changed, 572 insertions, 400 deletions
diff --git a/pkg/backoff/backoff.go b/pkg/backoff/backoff.go
index 6ce7bee..e79a1ee 100644
--- a/pkg/backoff/backoff.go
+++ b/pkg/backoff/backoff.go
@@ -14,10 +14,10 @@ type Backoff func(uint64) time.Duration
// It panics if min >= max.
func NewExponentialWithJitter(min, max time.Duration) Backoff {
if min <= 0 {
- min = time.Millisecond * 100
+ min = 100 * time.Millisecond
}
if max <= 0 {
- max = time.Second * 10
+ max = 10 * time.Second
}
if min >= max {
panic("max must be larger than min")
diff --git a/pkg/config/config.go b/pkg/config/config.go
index a683014..744f4c3 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -1,15 +1,12 @@
package config
import (
- "bytes"
"crypto/tls"
"crypto/x509"
- "fmt"
"github.com/creasty/defaults"
"github.com/goccy/go-yaml"
"github.com/jessevdk/go-flags"
"github.com/pkg/errors"
- "io/ioutil"
"os"
)
@@ -19,8 +16,6 @@ type Config struct {
Redis Redis `yaml:"redis"`
Logging Logging `yaml:"logging"`
Retention Retention `yaml:"retention"`
-
- DecodeWarning error `yaml:"-"`
}
// Validate checks constraints in the supplied configuration and returns an error if they are violated.
@@ -51,13 +46,14 @@ type Flags struct {
// FromYAMLFile returns a new Config value created from the given YAML config file.
func FromYAMLFile(name string) (*Config, error) {
- f, err := os.ReadFile(name)
+ f, err := os.Open(name)
if err != nil {
- return nil, errors.Wrap(err, "can't read YAML file "+name)
+ return nil, errors.Wrap(err, "can't open YAML file "+name)
}
+ defer f.Close()
c := &Config{}
- d := yaml.NewDecoder(bytes.NewReader(f))
+ d := yaml.NewDecoder(f, yaml.DisallowUnknownField())
if err := defaults.Set(c); err != nil {
return nil, errors.Wrap(err, "can't set config defaults")
@@ -67,16 +63,8 @@ func FromYAMLFile(name string) (*Config, error) {
return nil, errors.Wrap(err, "can't parse YAML file "+name)
}
- // Decode again with yaml.DisallowUnknownField() (like v1.2 will do) and issue a warning if it returns an error.
- c.DecodeWarning = yaml.NewDecoder(bytes.NewReader(f), yaml.DisallowUnknownField()).Decode(&Config{})
-
if err := c.Validate(); err != nil {
- const msg = "invalid configuration"
- if warn := c.DecodeWarning; warn != nil {
- return nil, fmt.Errorf("%s: %w\n\nwarning: ignored unknown config option:\n\n%v", msg, err, warn)
- } else {
- return nil, errors.Wrap(err, msg)
- }
+ return nil, errors.Wrap(err, "invalid configuration")
}
return c, nil
@@ -129,7 +117,7 @@ func (t *TLS) MakeConfig(serverName string) (*tls.Config, error) {
if t.Insecure {
tlsConfig.InsecureSkipVerify = true
} else if t.Ca != "" {
- raw, err := ioutil.ReadFile(t.Ca)
+ raw, err := os.ReadFile(t.Ca)
if err != nil {
return nil, errors.Wrap(err, "can't read CA file")
}
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 2418094..94e3773 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -20,34 +20,33 @@ redis:
host: 2001:db8::1
`
- miniOutput := &Config{}
- _ = defaults.Set(miniOutput)
-
- miniOutput.Database.Host = "192.0.2.1"
- miniOutput.Database.Database = "icingadb"
- miniOutput.Database.User = "icingadb"
- miniOutput.Database.Password = "icingadb"
-
- miniOutput.Redis.Host = "2001:db8::1"
- miniOutput.Logging.Output = logging.CONSOLE
-
subtests := []struct {
name string
input string
output *Config
- warn bool
}{
{
- name: "mini",
- input: miniConf,
- output: miniOutput,
- warn: false,
+ name: "mini",
+ input: miniConf,
+ output: func() *Config {
+ c := &Config{}
+ _ = defaults.Set(c)
+
+ c.Database.Host = "192.0.2.1"
+ c.Database.Database = "icingadb"
+ c.Database.User = "icingadb"
+ c.Database.Password = "icingadb"
+
+ c.Redis.Host = "2001:db8::1"
+ c.Logging.Output = logging.CONSOLE
+
+ return c
+ }(),
},
{
name: "mini-with-unknown",
input: miniConf + "\nunknown: 42",
- output: miniOutput,
- warn: true,
+ output: nil,
},
}
@@ -59,19 +58,12 @@ redis:
require.NoError(t, os.WriteFile(tempFile.Name(), []byte(st.input), 0o600))
- actual, err := FromYAMLFile(tempFile.Name())
- require.NoError(t, err)
-
- if st.warn {
- require.Error(t, actual.DecodeWarning, "reading config should produce a warning")
-
- // Reset the warning so that the following require.Equal() doesn't try to compare it.
- actual.DecodeWarning = nil
+ if actual, err := FromYAMLFile(tempFile.Name()); st.output == nil {
+ require.Error(t, err)
} else {
- require.NoError(t, actual.DecodeWarning, "reading config should not produce a warning")
+ require.NoError(t, err)
+ require.Equal(t, st.output, actual)
}
-
- require.Equal(t, st.output, actual)
})
}
}
diff --git a/pkg/config/database.go b/pkg/config/database.go
index b42ff8e..0895d26 100644
--- a/pkg/config/database.go
+++ b/pkg/config/database.go
@@ -1,25 +1,25 @@
package config
import (
+ "context"
+ "database/sql"
+ "database/sql/driver"
"fmt"
"github.com/go-sql-driver/mysql"
- "github.com/icinga/icingadb/pkg/driver"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
+ "github.com/lib/pq"
"github.com/pkg/errors"
"net"
"net/url"
"strconv"
"strings"
- "sync"
"time"
)
-var registerDriverOnce sync.Once
-
// Database defines database client configuration.
type Database struct {
Type string `yaml:"type" default:"mysql"`
@@ -35,17 +35,14 @@ type Database struct {
// Open prepares the DSN string and driver configuration,
// calls sqlx.Open, but returns *icingadb.DB.
func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
- registerDriverOnce.Do(func() {
- driver.Register(logger)
- })
-
- var dsn string
+ var db *sqlx.DB
switch d.Type {
case "mysql":
config := mysql.NewConfig()
config.User = d.User
config.Passwd = d.Password
+ config.Logger = icingadb.MysqlFuncLogger(logger.Debug)
if d.isUnixAddr() {
config.Net = "unix"
@@ -61,7 +58,7 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
config.DBName = d.Database
config.Timeout = time.Minute
- config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"}
+ config.Params = map[string]string{"sql_mode": "'TRADITIONAL,ANSI_QUOTES'"}
tlsConfig, err := d.TlsOptions.MakeConfig(d.Host)
if err != nil {
@@ -75,7 +72,17 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
}
}
- dsn = config.FormatDSN()
+ c, err := mysql.NewConnector(config)
+ if err != nil {
+ return nil, errors.Wrap(err, "can't open mysql database")
+ }
+
+ wsrepSyncWait := int64(d.Options.WsrepSyncWait)
+ setWsrepSyncWait := func(ctx context.Context, conn driver.Conn) error {
+ return setGaleraOpts(ctx, conn, wsrepSyncWait)
+ }
+
+ db = sqlx.NewDb(sql.OpenDB(icingadb.NewConnector(c, logger, setWsrepSyncWait)), icingadb.MySQL)
case "pgsql":
uri := &url.URL{
Scheme: "postgres",
@@ -123,16 +130,17 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
}
uri.RawQuery = query.Encode()
- dsn = uri.String()
+
+ connector, err := pq.NewConnector(uri.String())
+ if err != nil {
+ return nil, errors.Wrap(err, "can't open pgsql database")
+ }
+
+ db = sqlx.NewDb(sql.OpenDB(icingadb.NewConnector(connector, logger, nil)), icingadb.PostgreSQL)
default:
return nil, unknownDbType(d.Type)
}
- db, err := sqlx.Open("icingadb-"+d.Type, dsn)
- if err != nil {
- return nil, errors.Wrap(err, "can't open database")
- }
-
db.SetMaxIdleConns(d.Options.MaxConnections / 3)
db.SetMaxOpenConns(d.Options.MaxConnections)
@@ -173,3 +181,36 @@ func (d *Database) isUnixAddr() bool {
func unknownDbType(t string) error {
return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t)
}
+
+// setGaleraOpts sets the "wsrep_sync_wait" variable for each session ensures that causality checks are performed
+// before execution and that each statement is executed on a fully synchronized node. Doing so prevents foreign key
+// violation when inserting into dependent tables on different MariaDB/MySQL nodes. When using MySQL single nodes,
+// the "SET SESSION" command will fail with "Unknown system variable (1193)" and will therefore be silently dropped.
+//
+// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait
+func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error {
+ const galeraOpts = "SET SESSION wsrep_sync_wait=?"
+
+ stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts)
+ if err != nil {
+ if errors.Is(err, &mysql.MySQLError{Number: 1193}) { // Unknown system variable
+ return nil
+ }
+
+ return errors.Wrap(err, "cannot prepare "+galeraOpts)
+ }
+ // This is just for an unexpected exit and any returned error can safely be ignored and in case
+ // of the normal function exit, the stmt is closed manually, and its error is handled gracefully.
+ defer func() { _ = stmt.Close() }()
+
+ _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}})
+ if err != nil {
+ return errors.Wrap(err, "cannot execute "+galeraOpts)
+ }
+
+ if err = stmt.Close(); err != nil {
+ return errors.Wrap(err, "cannot close prepared statement "+galeraOpts)
+ }
+
+ return nil
+}
diff --git a/pkg/config/redis.go b/pkg/config/redis.go
index 38571e3..ad8b31a 100644
--- a/pkg/config/redis.go
+++ b/pkg/config/redis.go
@@ -4,13 +4,13 @@ import (
"context"
"crypto/tls"
"fmt"
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/retry"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
+ "github.com/redis/go-redis/v9"
"go.uber.org/zap"
"net"
"strings"
@@ -85,16 +85,16 @@ func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
retry.Settings{
- Timeout: 5 * time.Minute,
- OnError: func(_ time.Duration, _ uint64, err, lastErr error) {
+ Timeout: retry.DefaultTimeout,
+ OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) {
if lastErr == nil || err.Error() != lastErr.Error() {
logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err))
}
},
OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
- if attempt > 0 {
+ if attempt > 1 {
logger.Infow("Reconnected to Redis",
- zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1))
+ zap.Duration("after", elapsed), zap.Uint64("attempts", attempt))
}
},
},
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
deleted file mode 100644
index f529db4..0000000
--- a/pkg/driver/driver.go
+++ /dev/null
@@ -1,114 +0,0 @@
-package driver
-
-import (
- "context"
- "database/sql"
- "database/sql/driver"
- "github.com/go-sql-driver/mysql"
- "github.com/icinga/icingadb/pkg/backoff"
- "github.com/icinga/icingadb/pkg/icingaredis/telemetry"
- "github.com/icinga/icingadb/pkg/logging"
- "github.com/icinga/icingadb/pkg/retry"
- "github.com/jmoiron/sqlx"
- "github.com/pkg/errors"
- "go.uber.org/zap"
- "time"
-)
-
-const MySQL = "icingadb-mysql"
-const PostgreSQL = "icingadb-pgsql"
-
-var timeout = time.Minute * 5
-
-// RetryConnector wraps driver.Connector with retry logic.
-type RetryConnector struct {
- driver.Connector
- driver Driver
-}
-
-// Connect implements part of the driver.Connector interface.
-func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
- var conn driver.Conn
- err := errors.Wrap(retry.WithBackoff(
- ctx,
- func(ctx context.Context) (err error) {
- conn, err = c.Connector.Connect(ctx)
- return
- },
- shouldRetry,
- backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1),
- retry.Settings{
- Timeout: timeout,
- OnError: func(_ time.Duration, _ uint64, err, lastErr error) {
- telemetry.UpdateCurrentDbConnErr(err)
-
- if lastErr == nil || err.Error() != lastErr.Error() {
- c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err))
- }
- },
- OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
- telemetry.UpdateCurrentDbConnErr(nil)
-
- if attempt > 0 {
- c.driver.Logger.Infow("Reconnected to database",
- zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1))
- }
- },
- },
- ), "can't connect to database")
- return conn, err
-}
-
-// Driver implements part of the driver.Connector interface.
-func (c RetryConnector) Driver() driver.Driver {
- return c.driver
-}
-
-// Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector.
-type Driver struct {
- ctxDriver
- Logger *logging.Logger
-}
-
-// OpenConnector implements the DriverContext interface.
-func (d Driver) OpenConnector(name string) (driver.Connector, error) {
- c, err := d.ctxDriver.OpenConnector(name)
- if err != nil {
- return nil, err
- }
-
- return &RetryConnector{
- driver: d,
- Connector: c,
- }, nil
-}
-
-// Register makes our database Driver available under the name "icingadb-*sql".
-func Register(logger *logging.Logger) {
- sql.Register(MySQL, &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger})
- sql.Register(PostgreSQL, &Driver{ctxDriver: PgSQLDriver{}, Logger: logger})
- _ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) }))
- sqlx.BindDriver(PostgreSQL, sqlx.DOLLAR)
-}
-
-// ctxDriver helps ensure that we only support drivers that implement driver.Driver and driver.DriverContext.
-type ctxDriver interface {
- driver.Driver
- driver.DriverContext
-}
-
-// mysqlLogger is an adapter that allows ordinary functions to be used as a logger for mysql.SetLogger.
-type mysqlLogger func(v ...interface{})
-
-// Print implements the mysql.Logger interface.
-func (log mysqlLogger) Print(v ...interface{}) {
- log(v)
-}
-
-func shouldRetry(err error) bool {
- if errors.Is(err, driver.ErrBadConn) {
- return true
- }
-
- return retry.Retryable(err)
-}
diff --git a/pkg/driver/pgsql.go b/pkg/driver/pgsql.go
deleted file mode 100644
index 3c88fe0..0000000
--- a/pkg/driver/pgsql.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package driver
-
-import (
- "database/sql/driver"
- "github.com/lib/pq"
-)
-
-// PgSQLDriver extends pq.Driver with driver.DriverContext compliance.
-type PgSQLDriver struct {
- pq.Driver
-}
-
-// Assert interface compliance.
-var (
- _ driver.Driver = PgSQLDriver{}
- _ driver.DriverContext = PgSQLDriver{}
-)
-
-// OpenConnector implements the driver.DriverContext interface.
-func (PgSQLDriver) OpenConnector(name string) (driver.Connector, error) {
- return pq.NewConnector(name)
-}
diff --git a/pkg/flatten/flatten.go b/pkg/flatten/flatten.go
index 94a6e7e..698eff1 100644
--- a/pkg/flatten/flatten.go
+++ b/pkg/flatten/flatten.go
@@ -1,7 +1,6 @@
package flatten
import (
- "database/sql"
"fmt"
"github.com/icinga/icingadb/pkg/types"
"strconv"
@@ -32,12 +31,12 @@ func Flatten(value interface{}, prefix string) map[string]types.String {
for i, v := range value {
flatten(key+"["+strconv.Itoa(i)+"]", v)
}
+ case nil:
+ flattened[key] = types.MakeString("null")
+ case float64:
+ flattened[key] = types.MakeString(strconv.FormatFloat(value, 'f', -1, 64))
default:
- val := "null"
- if value != nil {
- val = fmt.Sprintf("%v", value)
- }
- flattened[key] = types.String{NullString: sql.NullString{String: val, Valid: true}}
+ flattened[key] = types.MakeString(fmt.Sprintf("%v", value))
}
}
diff --git a/pkg/flatten/flatten_test.go b/pkg/flatten/flatten_test.go
new file mode 100644
index 0000000..f84b8d9
--- /dev/null
+++ b/pkg/flatten/flatten_test.go
@@ -0,0 +1,45 @@
+package flatten
+
+import (
+ "github.com/icinga/icingadb/pkg/types"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestFlatten(t *testing.T) {
+ for _, st := range []struct {
+ name string
+ prefix string
+ value any
+ output map[string]types.String
+ }{
+ {"nil", "a", nil, map[string]types.String{"a": types.MakeString("null")}},
+ {"bool", "b", true, map[string]types.String{"b": types.MakeString("true")}},
+ {"int", "c", 42, map[string]types.String{"c": types.MakeString("42")}},
+ {"float", "d", 77.7, map[string]types.String{"d": types.MakeString("77.7")}},
+ {"large_float", "e", 1e23, map[string]types.String{"e": types.MakeString("100000000000000000000000")}},
+ {"string", "f", "\x00", map[string]types.String{"f": types.MakeString("\x00")}},
+ {"nil_slice", "g", []any(nil), map[string]types.String{"g": {}}},
+ {"empty_slice", "h", []any{}, map[string]types.String{"h": {}}},
+ {"slice", "i", []any{nil}, map[string]types.String{"i[0]": types.MakeString("null")}},
+ {"nil_map", "j", map[string]any(nil), map[string]types.String{"j": {}}},
+ {"empty_map", "k", map[string]any{}, map[string]types.String{"k": {}}},
+ {"map", "l", map[string]any{" ": nil}, map[string]types.String{"l. ": types.MakeString("null")}},
+ {"map_with_slice", "m", map[string]any{"\t": []any{"ä", "ö", "ü"}, "ß": "s"}, map[string]types.String{
+ "m.\t[0]": types.MakeString("ä"),
+ "m.\t[1]": types.MakeString("ö"),
+ "m.\t[2]": types.MakeString("ü"),
+ "m.ß": types.MakeString("s"),
+ }},
+ {"slice_with_map", "n", []any{map[string]any{"ä": "a", "ö": "o", "ü": "u"}, "ß"}, map[string]types.String{
+ "n[0].ä": types.MakeString("a"),
+ "n[0].ö": types.MakeString("o"),
+ "n[0].ü": types.MakeString("u"),
+ "n[1]": types.MakeString("ß"),
+ }},
+ } {
+ t.Run(st.name, func(t *testing.T) {
+ assert.Equal(t, st.output, Flatten(st.value, st.prefix))
+ })
+ }
+}
diff --git a/pkg/icingadb/cleanup.go b/pkg/icingadb/cleanup.go
index e57eafa..22bf02d 100644
--- a/pkg/icingadb/cleanup.go
+++ b/pkg/icingadb/cleanup.go
@@ -4,8 +4,9 @@ import (
"context"
"fmt"
"github.com/icinga/icingadb/internal"
+ "github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/com"
- "github.com/icinga/icingadb/pkg/driver"
+ "github.com/icinga/icingadb/pkg/retry"
"github.com/icinga/icingadb/pkg/types"
"time"
)
@@ -20,10 +21,10 @@ type CleanupStmt struct {
// Build assembles the cleanup statement for the specified database driver with the given limit.
func (stmt *CleanupStmt) Build(driverName string, limit uint64) string {
switch driverName {
- case driver.MySQL, "mysql":
+ case MySQL:
return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time
ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
- case driver.PostgreSQL, "postgres":
+ case PostgreSQL:
return fmt.Sprintf(`WITH rows AS (
SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
)
@@ -41,32 +42,46 @@ func (db *DB) CleanupOlderThan(
count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}],
) (uint64, error) {
var counter com.Counter
- defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop()
+
+ q := db.Rebind(stmt.Build(db.DriverName(), count))
+
+ defer db.log(ctx, q, &counter).Stop()
for {
- q := db.Rebind(stmt.Build(db.DriverName(), count))
- rs, err := db.NamedExecContext(ctx, q, cleanupWhere{
- EnvironmentId: envId,
- Time: types.UnixMilli(olderThan),
- })
- if err != nil {
- return 0, internal.CantPerformQuery(err, q)
- }
+ var rowsDeleted int64
+
+ err := retry.WithBackoff(
+ ctx,
+ func(ctx context.Context) error {
+ rs, err := db.NamedExecContext(ctx, q, cleanupWhere{
+ EnvironmentId: envId,
+ Time: types.UnixMilli(olderThan),
+ })
+ if err != nil {
+ return internal.CantPerformQuery(err, q)
+ }
+
+ rowsDeleted, err = rs.RowsAffected()
- n, err := rs.RowsAffected()
+ return err
+ },
+ retry.Retryable,
+ backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
+ db.getDefaultRetrySettings(),
+ )
if err != nil {
return 0, err
}
- counter.Add(uint64(n))
+ counter.Add(uint64(rowsDeleted))
for _, onSuccess := range onSuccess {
- if err := onSuccess(ctx, make([]struct{}, n)); err != nil {
+ if err := onSuccess(ctx, make([]struct{}, rowsDeleted)); err != nil {
return 0, err
}
}
- if n < int64(count) {
+ if rowsDeleted < int64(count) {
break
}
}
diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go
index 4ff3e0d..47940af 100644
--- a/pkg/icingadb/db.go
+++ b/pkg/icingadb/db.go
@@ -7,13 +7,13 @@ import (
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/contracts"
- "github.com/icinga/icingadb/pkg/driver"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/retry"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
+ "go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"reflect"
@@ -54,6 +54,12 @@ type Options struct {
// MaxRowsPerTransaction defines the maximum number of rows per transaction.
// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"`
+
+ // WsrepSyncWait enforces Galera cluster nodes to perform strict cluster-wide causality checks
+ // before executing specific SQL queries determined by the number you provided.
+ // Please refer to the below link for a detailed description.
+ // https://icinga.com/docs/icinga-db/latest/doc/03-Configuration/#galera-cluster
+ WsrepSyncWait int `yaml:"wsrep_sync_wait" default:"7"`
}
// Validate checks constraints in the supplied database options and returns an error if they are violated.
@@ -70,6 +76,9 @@ func (o *Options) Validate() error {
if o.MaxRowsPerTransaction < 1 {
return errors.New("max_rows_per_transaction must be at least 1")
}
+ if o.WsrepSyncWait < 0 || o.WsrepSyncWait > 15 {
+ return errors.New("wsrep_sync_wait can only be set to a number between 0 and 15")
+ }
return nil
}
@@ -85,23 +94,35 @@ func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB {
}
const (
- expectedMysqlSchemaVersion = 4
- expectedPostgresSchemaVersion = 2
+ expectedMysqlSchemaVersion = 5
+ expectedPostgresSchemaVersion = 3
)
// CheckSchema asserts the database schema of the expected version being present.
func (db *DB) CheckSchema(ctx context.Context) error {
var expectedDbSchemaVersion uint16
switch db.DriverName() {
- case driver.MySQL:
+ case MySQL:
expectedDbSchemaVersion = expectedMysqlSchemaVersion
- case driver.PostgreSQL:
+ case PostgreSQL:
expectedDbSchemaVersion = expectedPostgresSchemaVersion
}
var version uint16
- err := db.QueryRowxContext(ctx, "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1").Scan(&version)
+ err := retry.WithBackoff(
+ ctx,
+ func(ctx context.Context) (err error) {
+ query := "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1"
+ err = db.QueryRowxContext(ctx, query).Scan(&version)
+ if err != nil {
+ err = internal.CantPerformQuery(err, query)
+ }
+ return
+ },
+ retry.Retryable,
+ backoff.NewExponentialWithJitter(128*time.Millisecond, 1*time.Minute),
+ db.getDefaultRetrySettings())
if err != nil {
return errors.Wrap(err, "can't check database schema version")
}
@@ -161,10 +182,10 @@ func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int) {
var clause string
switch db.DriverName() {
- case driver.MySQL:
+ case MySQL:
// MySQL treats UPDATE id = id as a no-op.
clause = fmt.Sprintf(`ON DUPLICATE KEY UPDATE "%s" = "%s"`, columns[0], columns[0])
- case driver.PostgreSQL:
+ case PostgreSQL:
clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO NOTHING", table)
}
@@ -224,10 +245,10 @@ func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders in
var clause, setFormat string
switch db.DriverName() {
- case driver.MySQL:
+ case MySQL:
clause = "ON DUPLICATE KEY UPDATE"
setFormat = `"%[1]s" = VALUES("%[1]s")`
- case driver.PostgreSQL:
+ case PostgreSQL:
clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO UPDATE SET", table)
setFormat = `"%[1]s" = EXCLUDED."%[1]s"`
}
@@ -338,7 +359,7 @@ func (db *DB) BulkExec(
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
- retry.Settings{},
+ db.getDefaultRetrySettings(),
)
}
}(b))
@@ -403,7 +424,7 @@ func (db *DB) NamedBulkExec(
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
- retry.Settings{},
+ db.getDefaultRetrySettings(),
)
}
}(b))
@@ -476,7 +497,7 @@ func (db *DB) NamedBulkExecTx(
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
- retry.Settings{},
+ db.getDefaultRetrySettings(),
)
}
}(b))
@@ -662,6 +683,25 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
}
}
+func (db *DB) getDefaultRetrySettings() retry.Settings {
+ return retry.Settings{
+ Timeout: retry.DefaultTimeout,
+ OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) {
+ if lastErr == nil || err.Error() != lastErr.Error() {
+ db.logger.Warnw("Can't execute query. Retrying", zap.Error(err))
+ }
+ },
+ OnSuccess: func(elapsed time.Duration, attempt uint64, lastErr error) {
+ if attempt > 1 {
+ db.logger.Infow("Query retried successfully after error",
+ zap.Duration("after", elapsed),
+ zap.Uint64("attempts", attempt),
+ zap.NamedError("recovered_error", lastErr))
+ }
+ },
+ }
+}
+
func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper {
return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) {
if count := counter.Reset(); count > 0 {
diff --git a/pkg/icingadb/driver.go b/pkg/icingadb/driver.go
new file mode 100644
index 0000000..d564916
--- /dev/null
+++ b/pkg/icingadb/driver.go
@@ -0,0 +1,90 @@
+package icingadb
+
+import (
+ "context"
+ "database/sql/driver"
+ "github.com/icinga/icingadb/pkg/backoff"
+ "github.com/icinga/icingadb/pkg/icingaredis/telemetry"
+ "github.com/icinga/icingadb/pkg/logging"
+ "github.com/icinga/icingadb/pkg/retry"
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+ "time"
+)
+
+// Driver names as automatically registered in the database/sql package by themselves.
+const (
+ MySQL string = "mysql"
+ PostgreSQL string = "postgres"
+)
+
+type InitConnFunc func(context.Context, driver.Conn) error
+
+// RetryConnector wraps driver.Connector with retry logic.
+type RetryConnector struct {
+ driver.Connector
+
+ logger *logging.Logger
+
+ // initConn can be used to execute post Connect() arbitrary actions.
+ // It will be called after successfully initiated a new connection using the connector's Connect method.
+ initConn InitConnFunc
+}
+
+// NewConnector creates a fully initialized RetryConnector from the given args.
+func NewConnector(c driver.Connector, logger *logging.Logger, init InitConnFunc) *RetryConnector {
+ return &RetryConnector{Connector: c, logger: logger, initConn: init}
+}
+
+// Connect implements part of the driver.Connector interface.
+func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
+ var conn driver.Conn
+ err := errors.Wrap(retry.WithBackoff(
+ ctx,
+ func(ctx context.Context) (err error) {
+ conn, err = c.Connector.Connect(ctx)
+ if err == nil && c.initConn != nil {
+ if err = c.initConn(ctx, conn); err != nil {
+ // We're going to retry this, so just don't bother whether Close() fails!
+ _ = conn.Close()
+ }
+ }
+
+ return
+ },
+ retry.Retryable,
+ backoff.NewExponentialWithJitter(128*time.Millisecond, 1*time.Minute),
+ retry.Settings{
+ Timeout: retry.DefaultTimeout,
+ OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) {
+ telemetry.UpdateCurrentDbConnErr(err)
+
+ if lastErr == nil || err.Error() != lastErr.Error() {
+ c.logger.Warnw("Can't connect to database. Retrying", zap.Error(err))
+ }
+ },
+ OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
+ telemetry.UpdateCurrentDbConnErr(nil)
+
+ if attempt > 1 {
+ c.logger.Infow("Reconnected to database",
+ zap.Duration("after", elapsed), zap.Uint64("attempts", attempt))
+ }
+ },
+ },
+ ), "can't connect to database")
+ return conn, err
+}
+
+// Driver implements part of the driver.Connector interface.
+func (c RetryConnector) Driver() driver.Driver {
+ return c.Connector.Driver()
+}
+
+// MysqlFuncLogger is an adapter that allows ordinary functions to be used as a logger for mysql.SetLogger.
+type MysqlFuncLogger func(v ...interface{})
+
+// Print implements the mysql.Logger interface.
+func (log MysqlFuncLogger) Print(v ...interface{}) {
+ log(v)
+}
diff --git a/pkg/icingadb/dump_signals.go b/pkg/icingadb/dump_signals.go
index bce1aef..2f8b46e 100644
--- a/pkg/icingadb/dump_signals.go
+++ b/pkg/icingadb/dump_signals.go
@@ -2,10 +2,10 @@ package icingadb
import (
"context"
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/pkg/errors"
+ "github.com/redis/go-redis/v9"
"go.uber.org/zap"
"sync"
)
diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go
index 74d3b32..cc32a4b 100644
--- a/pkg/icingadb/ha.go
+++ b/pkg/icingadb/ha.go
@@ -9,7 +9,6 @@ import (
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/com"
- "github.com/icinga/icingadb/pkg/driver"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
@@ -23,7 +22,10 @@ import (
"time"
)
-var timeout = 60 * time.Second
+// peerTimeout defines the timeout for HA heartbeats, being used to detect absent nodes.
+//
+// Because this timeout relies on icingaredis.Timeout, it is icingaredis.Timeout plus a short grace period.
+const peerTimeout = icingaredis.Timeout + 5*time.Second
type haState struct {
responsibleTsMilli int64
@@ -43,8 +45,8 @@ type HA struct {
heartbeat *icingaredis.Heartbeat
logger *logging.Logger
responsible bool
- handover chan struct{}
- takeover chan struct{}
+ handover chan string
+ takeover chan string
done chan struct{}
errOnce sync.Once
errMu sync.Mutex
@@ -64,8 +66,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger
db: db,
heartbeat: heartbeat,
logger: logger,
- handover: make(chan struct{}),
- takeover: make(chan struct{}),
+ handover: make(chan string),
+ takeover: make(chan string),
done: make(chan struct{}),
}
@@ -107,13 +109,13 @@ func (h *HA) Err() error {
return h.err
}
-// Handover returns a channel with which handovers are signaled.
-func (h *HA) Handover() chan struct{} {
+// Handover returns a channel with which handovers and their reasons are signaled.
+func (h *HA) Handover() chan string {
return h.handover
}
-// Takeover returns a channel with which takeovers are signaled.
-func (h *HA) Takeover() chan struct{} {
+// Takeover returns a channel with which takeovers and their reasons are signaled.
+func (h *HA) Takeover() chan string {
return h.takeover
}
@@ -141,12 +143,24 @@ func (h *HA) controller() {
oldInstancesRemoved := false
- logTicker := time.NewTicker(time.Second * 60)
- defer logTicker.Stop()
- shouldLog := true
+ // Suppress recurring log messages in the realize method to be only logged this often.
+ routineLogTicker := time.NewTicker(5 * time.Minute)
+ defer routineLogTicker.Stop()
+ shouldLogRoutineEvents := true
+
+ // The retry logic in HA is twofold:
+ //
+ // 1) Updating or inserting the instance row based on the current heartbeat must be done within the heartbeat's
+ // expiration time. Therefore, we use a deadline ctx to retry.WithBackoff() in realize() which expires earlier
+ // than our default timeout.
+ // 2) Since we do not want to exit before our default timeout expires, we have to repeat step 1 until it does.
+ retryTimeout := time.NewTimer(retry.DefaultTimeout)
+ defer retryTimeout.Stop()
for {
select {
+ case <-retryTimeout.C:
+ h.abort(errors.New("retry deadline exceeded"))
case m := <-h.heartbeat.Events():
if m != nil {
now := time.Now()
@@ -158,10 +172,15 @@ func (h *HA) controller() {
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
}
- if tt.Before(now.Add(-1 * timeout)) {
+ if tt.Before(now.Add(-1 * peerTimeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
- h.signalHandover()
+
+ h.signalHandover("received heartbeat from the past")
h.realizeLostHeartbeat()
+
+ // Reset retry timeout so that the next iterations have the full amount of time available again.
+ retry.ResetTimeout(retryTimeout, retry.DefaultTimeout)
+
continue
}
s, err := m.Stats().IcingaStatus()
@@ -186,33 +205,28 @@ func (h *HA) controller() {
EntityWithoutChecksum: v1.EntityWithoutChecksum{IdMeta: v1.IdMeta{
Id: envId,
}},
- Name: types.String{
- NullString: sql.NullString{
- String: envId.String(),
- Valid: true,
- },
- },
+ Name: types.MakeString(envId.String()),
}
h.environmentMu.Unlock()
}
select {
- case <-logTicker.C:
- shouldLog = true
+ case <-routineLogTicker.C:
+ shouldLogRoutineEvents = true
default:
}
- var realizeCtx context.Context
- var cancelRealizeCtx context.CancelFunc
- if h.responsible {
- realizeCtx, cancelRealizeCtx = context.WithDeadline(h.ctx, m.ExpiryTime())
- } else {
- realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx)
- }
- err = h.realize(realizeCtx, s, t, envId, shouldLog)
+ // Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time.
+ realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime())
+ err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
- h.signalHandover()
+ h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time")
+
+ // Instance insert/update was not completed by the expiration time of the current heartbeat.
+ // Pass control back to the loop to try again with the next heartbeat,
+ // or exit the loop when the retry timeout has expired. Therefore,
+ // retry timeout is **not** reset here so that retries continue until the timeout has expired.
continue
}
if err != nil {
@@ -224,12 +238,20 @@ func (h *HA) controller() {
oldInstancesRemoved = true
}
- shouldLog = false
+ shouldLogRoutineEvents = false
} else {
h.logger.Error("Lost heartbeat")
- h.signalHandover()
+ h.signalHandover("lost heartbeat")
h.realizeLostHeartbeat()
}
+
+ // Reset retry timeout so that the next iterations have the full amount of time available again.
+ // Don't be surprised by the location of the code,
+ // as it is obvious that the timer is also reset after an error that ends the loop anyway.
+ // But this is the best place to catch all scenarios where the timeout needs to be reset.
+ // And since HA needs quite a bit of refactoring anyway to e.g. return immediately after calling h.abort(),
+ // it's fine to have it here for now.
+ retry.ResetTimeout(retryTimeout, retry.DefaultTimeout)
case <-h.heartbeat.Done():
if err := h.heartbeat.Err(); err != nil {
h.abort(err)
@@ -240,18 +262,34 @@ func (h *HA) controller() {
}
}
-func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error {
- var takeover, otherResponsible bool
+// realize a HA cycle triggered by a heartbeat event.
+//
+// shouldLogRoutineEvents indicates if recurrent events should be logged.
+func (h *HA) realize(
+ ctx context.Context,
+ s *icingaredisv1.IcingaStatus,
+ t *types.UnixMilli,
+ envId types.Binary,
+ shouldLogRoutineEvents bool,
+) error {
+ var (
+ takeover string
+ otherResponsible bool
+ )
+
+ if _, ok := ctx.Deadline(); !ok {
+ panic("can't use context w/o deadline in realize()")
+ }
err := retry.WithBackoff(
ctx,
func(ctx context.Context) error {
- takeover = false
+ takeover = ""
otherResponsible = false
isoLvl := sql.LevelSerializable
selectLock := ""
- if h.db.DriverName() == driver.MySQL {
+ if h.db.DriverName() == MySQL {
// The RDBMS may actually be a Percona XtraDB Cluster which doesn't
// support serializable transactions, but only their following equivalent:
isoLvl = sql.LevelRepeatableRead
@@ -264,25 +302,41 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
}
query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
- "WHERE environment_id = ? AND responsible = ? AND id <> ? AND heartbeat > ?") + selectLock
+ "WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock
instance := &v1.IcingadbInstance{}
+ errQuery := tx.QueryRowxContext(ctx, query, envId, "y", h.instanceId).StructScan(instance)
+
+ switch {
+ case errQuery == nil:
+ fields := []any{
+ zap.String("instance_id", instance.Id.String()),
+ zap.String("environment", envId.String()),
+ zap.Time("heartbeat", instance.Heartbeat.Time()),
+ zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())),
+ }
- errQuery := tx.QueryRowxContext(
- ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(),
- ).StructScan(instance)
- switch errQuery {
- case nil:
- otherResponsible = true
- if shouldLog {
- h.logger.Infow("Another instance is active",
- zap.String("instance_id", instance.Id.String()),
- zap.String("environment", envId.String()),
- "heartbeat", instance.Heartbeat,
- zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())))
+ if instance.Heartbeat.Time().Before(time.Now().Add(-1 * peerTimeout)) {
+ takeover = "other instance's heartbeat has expired"
+ h.logger.Debugw("Preparing to take over HA as other instance's heartbeat has expired", fields...)
+ } else {
+ otherResponsible = true
+ if shouldLogRoutineEvents {
+ h.logger.Infow("Another instance is active", fields...)
+ }
+ }
+
+ case errors.Is(errQuery, sql.ErrNoRows):
+ fields := []any{
+ zap.String("instance_id", h.instanceId.String()),
+ zap.String("environment", envId.String())}
+ if !h.responsible {
+ takeover = "no other instance is active"
+ h.logger.Debugw("Preparing to take over HA as no instance is active", fields...)
+ } else if h.responsible && shouldLogRoutineEvents {
+ h.logger.Debugw("Continuing being the active instance", fields...)
}
- case sql.ErrNoRows:
- takeover = true
+
default:
return internal.CantPerformQuery(errQuery, query)
}
@@ -297,7 +351,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
EnvironmentId: envId,
},
Heartbeat: *t,
- Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true},
+ Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
EndpointId: s.EndpointId,
Icinga2Version: s.Version,
Icinga2StartTime: s.ProgramStart,
@@ -314,7 +368,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return internal.CantPerformQuery(err, stmt)
}
- if takeover {
+ if takeover != "" {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)
@@ -330,16 +384,33 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return nil
},
retry.Retryable,
- backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3),
+ backoff.NewExponentialWithJitter(256*time.Millisecond, 3*time.Second),
retry.Settings{
- OnError: func(_ time.Duration, attempt uint64, err, lastErr error) {
+ // Intentionally no timeout is set, as we use a context with a deadline.
+ OnRetryableError: func(_ time.Duration, attempt uint64, err, lastErr error) {
if lastErr == nil || err.Error() != lastErr.Error() {
log := h.logger.Debugw
- if attempt > 2 {
+ if attempt > 3 {
+ log = h.logger.Infow
+ }
+
+ log("Can't update or insert instance. Retrying", zap.Error(err))
+ }
+ },
+ OnSuccess: func(elapsed time.Duration, attempt uint64, lastErr error) {
+ if attempt > 1 {
+ log := h.logger.Debugw
+
+ if attempt > 4 {
+ // We log errors with severity info starting from the fourth attempt, (see above)
+ // so we need to log success with severity info from the fifth attempt.
log = h.logger.Infow
}
- log("Can't update or insert instance. Retrying", zap.Error(err), zap.Uint64("retry count", attempt))
+ log("Instance updated/inserted successfully after error",
+ zap.Duration("after", elapsed),
+ zap.Uint64("attempts", attempt),
+ zap.NamedError("recovered_error", lastErr))
}
},
},
@@ -348,14 +419,14 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return err
}
- if takeover {
+ if takeover != "" {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
if err := h.insertEnvironment(); err != nil {
return errors.Wrap(err, "can't insert environment")
}
- h.signalTakeover()
+ h.signalTakeover(takeover)
} else if otherResponsible {
if state, _ := h.state.Load(); !state.otherResponsible {
state.otherResponsible = true
@@ -366,6 +437,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return nil
}
+// realizeLostHeartbeat updates "responsible = n" for this HA into the database.
func (h *HA) realizeLostHeartbeat() {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?")
if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) {
@@ -399,10 +471,10 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
select {
case <-h.ctx.Done():
return
- case <-time.After(timeout):
+ case <-time.After(peerTimeout):
query := h.db.Rebind("DELETE FROM icingadb_instance " +
"WHERE id <> ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?")
- heartbeat := types.UnixMilli(time.Now().Add(-timeout))
+ heartbeat := types.UnixMilli(time.Now().Add(-1 * peerTimeout))
result, err := h.db.ExecContext(h.ctx, query, h.instanceId, envId,
s.EndpointId, heartbeat)
if err != nil {
@@ -421,7 +493,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
}
}
-func (h *HA) signalHandover() {
+// signalHandover gives up HA.responsible and notifies the HA.Handover chan.
+func (h *HA) signalHandover(reason string) {
if h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
@@ -430,7 +503,7 @@ func (h *HA) signalHandover() {
})
select {
- case h.handover <- struct{}{}:
+ case h.handover <- reason:
h.responsible = false
case <-h.ctx.Done():
// Noop
@@ -438,7 +511,8 @@ func (h *HA) signalHandover() {
}
}
-func (h *HA) signalTakeover() {
+// signalTakeover claims HA.responsible and notifies the HA.Takeover chan.
+func (h *HA) signalTakeover(reason string) {
if !h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
@@ -447,7 +521,7 @@ func (h *HA) signalTakeover() {
})
select {
- case h.takeover <- struct{}{}:
+ case h.takeover <- reason:
h.responsible = true
case <-h.ctx.Done():
// Noop
diff --git a/pkg/icingadb/history/sla.go b/pkg/icingadb/history/sla.go
index 79d22c7..7c0849e 100644
--- a/pkg/icingadb/history/sla.go
+++ b/pkg/icingadb/history/sla.go
@@ -1,10 +1,10 @@
package history
import (
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/icingadb/v1/history"
"github.com/icinga/icingadb/pkg/structify"
"github.com/icinga/icingadb/pkg/types"
+ "github.com/redis/go-redis/v9"
"reflect"
)
diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go
index dc8bc61..4be0e71 100644
--- a/pkg/icingadb/history/sync.go
+++ b/pkg/icingadb/history/sync.go
@@ -2,7 +2,6 @@ package history
import (
"context"
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/contracts"
@@ -17,6 +16,7 @@ import (
"github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
+ "github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"reflect"
"sync"
@@ -144,7 +144,11 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi
stream := "icinga:history:stream:" + key
for {
select {
- case bulk := <-bulks:
+ case bulk, ok := <-bulks:
+ if !ok {
+ return nil
+ }
+
ids := make([]string, len(bulk))
for i := range bulk {
ids[i] = bulk[i].ID
diff --git a/pkg/icingadb/objectpacker/objectpacker.go b/pkg/icingadb/objectpacker/objectpacker.go
index 9ddfdc8..0152745 100644
--- a/pkg/icingadb/objectpacker/objectpacker.go
+++ b/pkg/icingadb/objectpacker/objectpacker.go
@@ -6,7 +6,6 @@ import (
"fmt"
"github.com/pkg/errors"
"io"
- "io/ioutil"
"reflect"
"sort"
)
@@ -102,7 +101,7 @@ func packValue(in reflect.Value, out io.Writer) error {
// If there aren't any values to pack, ...
if l < 1 {
// ... create one and pack it - panics on disallowed type.
- _ = packValue(reflect.Zero(in.Type().Elem()), ioutil.Discard)
+ _ = packValue(reflect.Zero(in.Type().Elem()), io.Discard)
}
return nil
@@ -140,13 +139,13 @@ func packValue(in reflect.Value, out io.Writer) error {
packedKey = key.Slice(0, key.Len()).Interface().([]byte)
} else {
// Not just stringify the key (below), but also pack it (here) - panics on disallowed type.
- _ = packValue(iter.Key(), ioutil.Discard)
+ _ = packValue(iter.Key(), io.Discard)
packedKey = []byte(fmt.Sprint(key.Interface()))
}
} else {
// Not just stringify the key (below), but also pack it (here) - panics on disallowed type.
- _ = packValue(iter.Key(), ioutil.Discard)
+ _ = packValue(iter.Key(), io.Discard)
packedKey = []byte(fmt.Sprint(key.Interface()))
}
@@ -176,8 +175,8 @@ func packValue(in reflect.Value, out io.Writer) error {
typ := in.Type()
// ... create one and pack it - panics on disallowed type.
- _ = packValue(reflect.Zero(typ.Key()), ioutil.Discard)
- _ = packValue(reflect.Zero(typ.Elem()), ioutil.Discard)
+ _ = packValue(reflect.Zero(typ.Key()), io.Discard)
+ _ = packValue(reflect.Zero(typ.Elem()), io.Discard)
}
return nil
@@ -186,7 +185,7 @@ func packValue(in reflect.Value, out io.Writer) error {
err := packValue(reflect.Value{}, out)
// Create a fictive referenced value and pack it - panics on disallowed type.
- _ = packValue(reflect.Zero(in.Type().Elem()), ioutil.Discard)
+ _ = packValue(reflect.Zero(in.Type().Elem()), io.Discard)
return err
} else {
diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go
index 5cd4d67..377592a 100644
--- a/pkg/icingadb/overdue/sync.go
+++ b/pkg/icingadb/overdue/sync.go
@@ -4,7 +4,6 @@ import (
"context"
_ "embed"
"fmt"
- "github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/com"
@@ -17,6 +16,7 @@ import (
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/pkg/errors"
+ "github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"regexp"
"strconv"
diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go
index dfee9c0..a56263a 100644
--- a/pkg/icingadb/runtime_updates.go
+++ b/pkg/icingadb/runtime_updates.go
@@ -3,7 +3,6 @@ package icingadb
import (
"context"
"fmt"
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
@@ -15,6 +14,7 @@ import (
"github.com/icinga/icingadb/pkg/structify"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
+ "github.com/redis/go-redis/v9"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
diff --git a/pkg/icingadb/v1/checkable.go b/pkg/icingadb/v1/checkable.go
index dbb114c..4b1efeb 100644
--- a/pkg/icingadb/v1/checkable.go
+++ b/pkg/icingadb/v1/checkable.go
@@ -30,7 +30,7 @@ type Checkable struct {
IconImageAlt string `json:"icon_image_alt"`
IconImageId types.Binary `json:"icon_image_id"`
IsVolatile types.Bool `json:"is_volatile"`
- MaxCheckAttempts float64 `json:"max_check_attempts"`
+ MaxCheckAttempts uint32 `json:"max_check_attempts"`
Notes string `json:"notes"`
NotesUrlId types.Binary `json:"notes_url_id"`
NotificationsEnabled types.Bool `json:"notifications_enabled"`
diff --git a/pkg/icingadb/v1/history/state.go b/pkg/icingadb/v1/history/state.go
index dec13b0..6320b73 100644
--- a/pkg/icingadb/v1/history/state.go
+++ b/pkg/icingadb/v1/history/state.go
@@ -14,7 +14,7 @@ type StateHistory struct {
HardState uint8 `json:"hard_state"`
PreviousSoftState uint8 `json:"previous_soft_state"`
PreviousHardState uint8 `json:"previous_hard_state"`
- CheckAttempt uint8 `json:"check_attempt"`
+ CheckAttempt uint32 `json:"check_attempt"`
Output types.String `json:"output"`
LongOutput types.String `json:"long_output"`
MaxCheckAttempts uint32 `json:"max_check_attempts"`
diff --git a/pkg/icingadb/v1/state.go b/pkg/icingadb/v1/state.go
index bad8f28..983b14d 100644
--- a/pkg/icingadb/v1/state.go
+++ b/pkg/icingadb/v1/state.go
@@ -9,7 +9,7 @@ type State struct {
EnvironmentMeta `json:",inline"`
AcknowledgementCommentId types.Binary `json:"acknowledgement_comment_id"`
LastCommentId types.Binary `json:"last_comment_id"`
- CheckAttempt uint8 `json:"check_attempt"`
+ CheckAttempt uint32 `json:"check_attempt"`
CheckCommandline types.String `json:"check_commandline"`
CheckSource types.String `json:"check_source"`
SchedulingSource types.String `json:"scheduling_source"`
diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go
index d42713c..c494f95 100644
--- a/pkg/icingaredis/client.go
+++ b/pkg/icingaredis/client.go
@@ -2,7 +2,6 @@ package icingaredis
import (
"context"
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
@@ -10,6 +9,7 @@ import (
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
+ "github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"runtime"
diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go
index 9a8ebad..cb34010 100644
--- a/pkg/icingaredis/heartbeat.go
+++ b/pkg/icingaredis/heartbeat.go
@@ -2,13 +2,13 @@ package icingaredis
import (
"context"
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal"
v1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
+ "github.com/redis/go-redis/v9"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"sync"
@@ -16,9 +16,9 @@ import (
"time"
)
-// timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
+// Timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
// After this time, a heartbeat loss is propagated.
-var timeout = 60 * time.Second
+const Timeout = time.Minute
// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
// Also signals on if the heartbeat is Lost.
@@ -97,7 +97,7 @@ func (h *Heartbeat) controller(ctx context.Context) {
// Message producer loop.
g.Go(func() error {
// We expect heartbeats every second but only read them every 3 seconds.
- throttle := time.NewTicker(time.Second * 3)
+ throttle := time.NewTicker(3 * time.Second)
defer throttle.Stop()
for id := "$"; ; {
@@ -141,9 +141,9 @@ func (h *Heartbeat) controller(ctx context.Context) {
atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())
h.sendEvent(m)
- case <-time.After(timeout):
+ case <-time.After(Timeout):
if h.active {
- h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout))
+ h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", Timeout))
h.sendEvent(nil)
h.active = false
} else {
@@ -217,5 +217,5 @@ func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) {
// ExpiryTime returns the timestamp when the heartbeat expires.
func (m *HeartbeatMessage) ExpiryTime() time.Time {
- return m.received.Add(timeout)
+ return m.received.Add(Timeout)
}
diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go
index ee476a1..0057ae0 100644
--- a/pkg/icingaredis/telemetry/heartbeat.go
+++ b/pkg/icingaredis/telemetry/heartbeat.go
@@ -3,7 +3,6 @@ package telemetry
import (
"context"
"fmt"
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/icingaredis"
@@ -11,6 +10,7 @@ import (
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
+ "github.com/redis/go-redis/v9"
"go.uber.org/zap"
"regexp"
"runtime/metrics"
diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go
index 86db0b3..2b592a5 100644
--- a/pkg/icingaredis/telemetry/stats.go
+++ b/pkg/icingaredis/telemetry/stats.go
@@ -2,12 +2,12 @@ package telemetry
import (
"context"
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/utils"
+ "github.com/redis/go-redis/v9"
"go.uber.org/zap"
"strconv"
"time"
diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go
index 9176dba..50c97f9 100644
--- a/pkg/icingaredis/utils.go
+++ b/pkg/icingaredis/utils.go
@@ -2,13 +2,13 @@ package icingaredis
import (
"context"
- "github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
+ "github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go
index da73943..e5b93de 100644
--- a/pkg/retry/retry.go
+++ b/pkg/retry/retry.go
@@ -7,12 +7,15 @@ import (
"github.com/icinga/icingadb/pkg/backoff"
"github.com/lib/pq"
"github.com/pkg/errors"
+ "io"
"net"
- "strings"
"syscall"
"time"
)
+// DefaultTimeout is our opinionated default timeout for retrying database and Redis operations.
+const DefaultTimeout = 5 * time.Minute
+
// RetryableFunc is a retryable function.
type RetryableFunc func(context.Context) error
@@ -21,10 +24,15 @@ type IsRetryable func(error) bool
// Settings aggregates optional settings for WithBackoff.
type Settings struct {
- // Timeout lets WithBackoff give up once elapsed (if >0).
+ // If >0, Timeout lets WithBackoff stop retrying gracefully once elapsed based on the following criteria:
+ // * If the execution of RetryableFunc has taken longer than Timeout, no further attempts are made.
+ // * If Timeout elapses during the sleep phase between retries, one final retry is attempted.
+ // * RetryableFunc is always granted its full execution time and is not canceled if it exceeds Timeout.
+ // This means that WithBackoff may not stop exactly after Timeout expires,
+ // or may not retry at all if the first execution of RetryableFunc already takes longer than Timeout.
Timeout time.Duration
- // OnError is called if an error occurs.
- OnError func(elapsed time.Duration, attempt uint64, err, lastErr error)
+ // OnRetryableError is called if a retryable error occurs.
+ OnRetryableError func(elapsed time.Duration, attempt uint64, err, lastErr error)
// OnSuccess is called once the operation succeeds.
OnSuccess func(elapsed time.Duration, attempt uint64, lastErr error)
}
@@ -34,16 +42,19 @@ type Settings struct {
func WithBackoff(
ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, settings Settings,
) (err error) {
- parentCtx := ctx
+ // Channel for retry deadline, which is set to the channel of NewTimer() if a timeout is configured,
+ // otherwise nil, so that it blocks forever if there is no timeout.
+ var timeout <-chan time.Time
if settings.Timeout > 0 {
- var cancelCtx context.CancelFunc
- ctx, cancelCtx = context.WithTimeout(ctx, settings.Timeout)
- defer cancelCtx()
+ t := time.NewTimer(settings.Timeout)
+ defer t.Stop()
+ timeout = t.C
}
start := time.Now()
- for attempt := uint64(0); ; /* true */ attempt++ {
+ timedOut := false
+ for attempt := uint64(1); ; /* true */ attempt++ {
prevErr := err
if err = retryableFunc(ctx); err == nil {
@@ -54,43 +65,72 @@ func WithBackoff(
return
}
- if settings.OnError != nil {
- settings.OnError(time.Since(start), attempt, err, prevErr)
+ // Retryable function may have exited prematurely due to context errors.
+ // We explicitly check the context error here, as the error returned by the retryable function can pass the
+ // error.Is() checks even though it is not a real context error, e.g.
+ // https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/net/net.go;l=422
+ // https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/net/net.go;l=601
+ if errors.Is(ctx.Err(), context.DeadlineExceeded) || errors.Is(ctx.Err(), context.Canceled) {
+ if prevErr != nil {
+ err = errors.Wrap(err, prevErr.Error())
+ }
+
+ return
}
- isRetryable := retryable(err)
+ if !retryable(err) {
+ err = errors.Wrap(err, "can't retry")
- if prevErr != nil && (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) {
- err = prevErr
+ return
}
- if !isRetryable {
- err = errors.Wrap(err, "can't retry")
+ select {
+ case <-timeout:
+ // Stop retrying immediately if executing the retryable function took longer than the timeout.
+ timedOut = true
+ default:
+ }
+
+ if timedOut {
+ err = errors.Wrap(err, "retry deadline exceeded")
return
}
- sleep := b(attempt)
+ if settings.OnRetryableError != nil {
+ settings.OnRetryableError(time.Since(start), attempt, err, prevErr)
+ }
+
select {
+ case <-time.After(b(attempt)):
+ case <-timeout:
+ // Do not stop retrying immediately, but start one last attempt to mitigate timing issues where
+ // the timeout expires while waiting for the next attempt and
+ // therefore no retries have happened during this possibly long period.
+ timedOut = true
case <-ctx.Done():
- if outerErr := parentCtx.Err(); outerErr != nil {
- err = errors.Wrap(outerErr, "outer context canceled")
- } else {
- if err == nil {
- err = ctx.Err()
- }
- err = errors.Wrap(err, "can't retry")
- }
+ err = errors.Wrap(ctx.Err(), err.Error())
return
- case <-time.After(sleep):
}
}
}
+// ResetTimeout changes the possibly expired timer t to expire after duration d.
+//
+// If the timer has already expired and nothing has been received from its channel,
+// it is automatically drained as if the timer had never expired.
+func ResetTimeout(t *time.Timer, d time.Duration) {
+ if !t.Stop() {
+ <-t.C
+ }
+
+ t.Reset(d)
+}
+
// Retryable returns true for common errors that are considered retryable,
// i.e. temporary, timeout, DNS, connection refused and reset, host down and unreachable and
-// network down and unreachable errors.
+// network down and unreachable errors. In addition, any database error is considered retryable.
func Retryable(err error) bool {
var temporary interface {
Temporary() bool
@@ -133,6 +173,12 @@ func Retryable(err error) bool {
if errors.Is(err, syscall.ENETDOWN) || errors.Is(err, syscall.ENETUNREACH) {
return true
}
+ if errors.Is(err, syscall.EPIPE) {
+ return true
+ }
+ if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
+ return true
+ }
if errors.Is(err, driver.ErrBadConn) {
return true
@@ -141,43 +187,10 @@ func Retryable(err error) bool {
return true
}
- var e *mysql.MySQLError
- if errors.As(err, &e) {
- switch e.Number {
- case 1053, 1205, 1213, 2006:
- // 1053: Server shutdown in progress
- // 1205: Lock wait timeout
- // 1213: Deadlock found when trying to get lock
- // 2006: MySQL server has gone away
- return true
- default:
- return false
- }
- }
-
- var pe *pq.Error
- if errors.As(err, &pe) {
- switch pe.Code {
- case "08000", // connection_exception
- "08006", // connection_failure
- "08001", // sqlclient_unable_to_establish_sqlconnection
- "08004", // sqlserver_rejected_establishment_of_sqlconnection
- "40001", // serialization_failure
- "40P01", // deadlock_detected
- "54000", // program_limit_exceeded
- "55006", // object_in_use
- "55P03", // lock_not_available
- "57P01", // admin_shutdown
- "57P02", // crash_shutdown
- "57P03", // cannot_connect_now
- "58000", // system_error
- "58030", // io_error
- "XX000": // internal_error
- return true
- default:
- // Class 53 - Insufficient Resources
- return strings.HasPrefix(string(pe.Code), "53")
- }
+ var mye *mysql.MySQLError
+ var pqe *pq.Error
+ if errors.As(err, &mye) || errors.As(err, &pqe) {
+ return true
}
return false
diff --git a/pkg/types/string.go b/pkg/types/string.go
index f8ead45..ce2a4ac 100644
--- a/pkg/types/string.go
+++ b/pkg/types/string.go
@@ -15,6 +15,14 @@ type String struct {
sql.NullString
}
+// MakeString constructs a new non-NULL String from s.
+func MakeString(s string) String {
+ return String{sql.NullString{
+ String: s,
+ Valid: true,
+ }}
+}
+
// MarshalJSON implements the json.Marshaler interface.
// Supports JSON null.
func (s String) MarshalJSON() ([]byte, error) {