diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:40:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:40:59 +0000 |
commit | bc4e624732bd51c0dd1e9529cf228e8c23127732 (patch) | |
tree | d95dab8960e9d02d3b95f8653074ad2e54ca207c /pkg | |
parent | Initial commit. (diff) | |
download | icingadb-bc4e624732bd51c0dd1e9529cf228e8c23127732.tar.xz icingadb-bc4e624732bd51c0dd1e9529cf228e8c23127732.zip |
Adding upstream version 1.1.1.upstream/1.1.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
93 files changed, 10190 insertions, 0 deletions
diff --git a/pkg/backoff/backoff.go b/pkg/backoff/backoff.go new file mode 100644 index 0000000..6ce7bee --- /dev/null +++ b/pkg/backoff/backoff.go @@ -0,0 +1,43 @@ +package backoff + +import ( + "math/rand" + "time" +) + +// Backoff returns the backoff duration for a specific retry attempt. +type Backoff func(uint64) time.Duration + +// NewExponentialWithJitter returns a backoff implementation that +// exponentially increases the backoff duration for each retry from min, +// never exceeding max. Some randomization is added to the backoff duration. +// It panics if min >= max. +func NewExponentialWithJitter(min, max time.Duration) Backoff { + if min <= 0 { + min = time.Millisecond * 100 + } + if max <= 0 { + max = time.Second * 10 + } + if min >= max { + panic("max must be larger than min") + } + + return func(attempt uint64) time.Duration { + e := min << attempt + if e <= 0 || e > max { + e = max + } + + return time.Duration(jitter(int64(e))) + } +} + +// jitter returns a random integer distributed in the range [n/2..n). +func jitter(n int64) int64 { + if n == 0 { + return 0 + } + + return n/2 + rand.Int63n(n/2) +} diff --git a/pkg/com/atomic.go b/pkg/com/atomic.go new file mode 100644 index 0000000..316413d --- /dev/null +++ b/pkg/com/atomic.go @@ -0,0 +1,38 @@ +package com + +import "sync/atomic" + +// Atomic is a type-safe wrapper around atomic.Value. +type Atomic[T any] struct { + v atomic.Value +} + +func (a *Atomic[T]) Load() (_ T, ok bool) { + if v, ok := a.v.Load().(box[T]); ok { + return v.v, true + } + + return +} + +func (a *Atomic[T]) Store(v T) { + a.v.Store(box[T]{v}) +} + +func (a *Atomic[T]) Swap(new T) (old T, ok bool) { + if old, ok := a.v.Swap(box[T]{new}).(box[T]); ok { + return old.v, true + } + + return +} + +func (a *Atomic[T]) CompareAndSwap(old, new T) (swapped bool) { + return a.v.CompareAndSwap(box[T]{old}, box[T]{new}) +} + +// box allows, for the case T is an interface, nil values and values of different specific types implementing T +// to be stored in Atomic[T]#v (bypassing atomic.Value#Store()'s policy) by wrapping it (into a non-interface). +type box[T any] struct { + v T +} diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go new file mode 100644 index 0000000..5e8de52 --- /dev/null +++ b/pkg/com/bulker.go @@ -0,0 +1,187 @@ +package com + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" + "golang.org/x/sync/errgroup" + "sync" + "time" +) + +// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles. +// A call takes an item for the current chunk into account. +// Output true indicates that the state machine was reset first and the bulker +// shall finish the current chunk now (not e.g. once $size is reached) without the given item. +type BulkChunkSplitPolicy[T any] func(T) bool + +type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T] + +// NeverSplit returns a pseudo state machine which never demands splitting. +func NeverSplit[T any]() BulkChunkSplitPolicy[T] { + return neverSplit[T] +} + +// SplitOnDupId returns a state machine which tracks the inputs' IDs. +// Once an already seen input arrives, it demands splitting. +func SplitOnDupId[T contracts.IDer]() BulkChunkSplitPolicy[T] { + seenIds := map[string]struct{}{} + + return func(ider T) bool { + id := ider.ID().String() + + _, ok := seenIds[id] + if ok { + seenIds = map[string]struct{}{id: {}} + } else { + seenIds[id] = struct{}{} + } + + return ok + } +} + +func neverSplit[T any](T) bool { + return false +} + +// Bulker reads all values from a channel and streams them in chunks into a Bulk channel. +type Bulker[T any] struct { + ch chan []T + ctx context.Context + mu sync.Mutex +} + +// NewBulker returns a new Bulker and starts streaming. +func NewBulker[T any]( + ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], +) *Bulker[T] { + b := &Bulker[T]{ + ch: make(chan []T), + ctx: ctx, + mu: sync.Mutex{}, + } + + go b.run(ch, count, splitPolicyFactory) + + return b +} + +// Bulk returns the channel on which the bulks are delivered. +func (b *Bulker[T]) Bulk() <-chan []T { + return b.ch +} + +func (b *Bulker[T]) run(ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T]) { + defer close(b.ch) + + bufCh := make(chan T, count) + splitPolicy := splitPolicyFactory() + g, ctx := errgroup.WithContext(b.ctx) + + g.Go(func() error { + defer close(bufCh) + + for { + select { + case v, ok := <-ch: + if !ok { + return nil + } + + bufCh <- v + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + g.Go(func() error { + for done := false; !done; { + buf := make([]T, 0, count) + timeout := time.After(256 * time.Millisecond) + + for drain := true; drain && len(buf) < count; { + select { + case v, ok := <-bufCh: + if !ok { + drain = false + done = true + + break + } + + if splitPolicy(v) { + if len(buf) > 0 { + b.ch <- buf + buf = make([]T, 0, count) + } + + timeout = time.After(256 * time.Millisecond) + } + + buf = append(buf, v) + case <-timeout: + drain = false + case <-ctx.Done(): + return ctx.Err() + } + } + + if len(buf) > 0 { + b.ch <- buf + } + + splitPolicy = splitPolicyFactory() + } + + return nil + }) + + // We don't expect an error here. + // We only use errgroup for the encapsulated use of sync.WaitGroup. + _ = g.Wait() +} + +// Bulk reads all values from a channel and streams them in chunks into a returned channel. +func Bulk[T any]( + ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], +) <-chan []T { + if count <= 1 { + return oneBulk(ctx, ch) + } + + return NewBulker(ctx, ch, count, splitPolicyFactory).Bulk() +} + +// oneBulk operates just as NewBulker(ctx, ch, 1, splitPolicy).Bulk(), +// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy. +func oneBulk[T any](ctx context.Context, ch <-chan T) <-chan []T { + out := make(chan []T) + go func() { + defer close(out) + + for { + select { + case item, ok := <-ch: + if !ok { + return + } + + select { + case out <- []T{item}: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + + return out +} + +var ( + _ BulkChunkSplitPolicyFactory[struct{}] = NeverSplit[struct{}] + _ BulkChunkSplitPolicyFactory[contracts.Entity] = SplitOnDupId[contracts.Entity] +) diff --git a/pkg/com/com.go b/pkg/com/com.go new file mode 100644 index 0000000..9e0a698 --- /dev/null +++ b/pkg/com/com.go @@ -0,0 +1,82 @@ +package com + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" + "golang.org/x/sync/errgroup" +) + +// WaitAsync calls Wait() on the passed Waiter in a new goroutine and +// sends the first non-nil error (if any) to the returned channel. +// The returned channel is always closed when the Waiter is done. +func WaitAsync(w contracts.Waiter) <-chan error { + errs := make(chan error, 1) + + go func() { + defer close(errs) + + if e := w.Wait(); e != nil { + errs <- e + } + }() + + return errs +} + +// ErrgroupReceive adds a goroutine to the specified group that +// returns the first non-nil error (if any) from the specified channel. +// If the channel is closed, it will return nil. +func ErrgroupReceive(g *errgroup.Group, err <-chan error) { + g.Go(func() error { + if e := <-err; e != nil { + return e + } + + return nil + }) +} + +// CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item. +func CopyFirst( + ctx context.Context, input <-chan contracts.Entity, +) (first contracts.Entity, forward <-chan contracts.Entity, err error) { + var ok bool + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case first, ok = <-input: + } + + if !ok { + return + } + + // Buffer of one because we receive an entity and send it back immediately. + fwd := make(chan contracts.Entity, 1) + fwd <- first + + forward = fwd + + go func() { + defer close(fwd) + + for { + select { + case <-ctx.Done(): + return + case e, ok := <-input: + if !ok { + return + } + + select { + case <-ctx.Done(): + return + case fwd <- e: + } + } + } + }() + + return +} diff --git a/pkg/com/cond.go b/pkg/com/cond.go new file mode 100644 index 0000000..72ba347 --- /dev/null +++ b/pkg/com/cond.go @@ -0,0 +1,90 @@ +package com + +import ( + "context" + "github.com/pkg/errors" +) + +// Cond implements a channel-based synchronization for goroutines that wait for signals or send them. +// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation, +// which is only started when NewCond is called. Thus the zero value cannot be used. +type Cond struct { + broadcast chan struct{} + done chan struct{} + cancel context.CancelFunc + listeners chan chan struct{} +} + +// NewCond returns a new Cond and starts the controller loop. +func NewCond(ctx context.Context) *Cond { + ctx, cancel := context.WithCancel(ctx) + + c := &Cond{ + broadcast: make(chan struct{}), + cancel: cancel, + done: make(chan struct{}), + listeners: make(chan chan struct{}), + } + + go c.controller(ctx) + + return c +} + +// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait. +// Panics if the controller loop has already ended. +func (c *Cond) Broadcast() { + select { + case c.broadcast <- struct{}{}: + case <-c.done: + panic(errors.New("condition closed")) + } +} + +// Close stops the controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (c *Cond) Close() error { + c.cancel() + <-c.done + + return nil +} + +// Done returns a channel that will be closed when the controller loop has ended. +func (c *Cond) Done() <-chan struct{} { + return c.done +} + +// Wait returns a channel that is closed with the next signal. +// Panics if the controller loop has already ended. +func (c *Cond) Wait() <-chan struct{} { + select { + case l := <-c.listeners: + return l + case <-c.done: + panic(errors.New("condition closed")) + } +} + +// controller loop. +func (c *Cond) controller(ctx context.Context) { + defer close(c.done) + + // Note that the notify channel does not close when the controller loop ends + // in order not to notify pending listeners. + notify := make(chan struct{}) + + for { + select { + case <-c.broadcast: + // Close channel to notify all current listeners. + close(notify) + // Create a new channel for the next listeners. + notify = make(chan struct{}) + case c.listeners <- notify: + // A new listener received the channel. + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/com/counter.go b/pkg/com/counter.go new file mode 100644 index 0000000..52f9f7f --- /dev/null +++ b/pkg/com/counter.go @@ -0,0 +1,48 @@ +package com + +import ( + "sync" + "sync/atomic" +) + +// Counter implements an atomic counter. +type Counter struct { + value uint64 + mu sync.Mutex // Protects total. + total uint64 +} + +// Add adds the given delta to the counter. +func (c *Counter) Add(delta uint64) { + atomic.AddUint64(&c.value, delta) +} + +// Inc increments the counter by one. +func (c *Counter) Inc() { + c.Add(1) +} + +// Reset resets the counter to 0 and returns its previous value. +// Does not reset the total value returned from Total. +func (c *Counter) Reset() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + v := atomic.SwapUint64(&c.value, 0) + c.total += v + + return v +} + +// Total returns the total counter value. +func (c *Counter) Total() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + return c.total + c.Val() +} + +// Val returns the current counter value. +func (c *Counter) Val() uint64 { + return atomic.LoadUint64(&c.value) +} diff --git a/pkg/common/sync_subject.go b/pkg/common/sync_subject.go new file mode 100644 index 0000000..a39d6df --- /dev/null +++ b/pkg/common/sync_subject.go @@ -0,0 +1,71 @@ +package common + +import ( + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/utils" +) + +// SyncSubject defines information about entities to be synchronized. +type SyncSubject struct { + entity contracts.Entity + factory contracts.EntityFactoryFunc + withChecksum bool +} + +// NewSyncSubject returns a new SyncSubject. +func NewSyncSubject(factoryFunc contracts.EntityFactoryFunc) *SyncSubject { + e := factoryFunc() + + var factory contracts.EntityFactoryFunc + if _, ok := e.(contracts.Initer); ok { + factory = func() contracts.Entity { + e := factoryFunc() + e.(contracts.Initer).Init() + + return e + } + } else { + factory = factoryFunc + } + + _, withChecksum := e.(contracts.Checksumer) + + return &SyncSubject{ + entity: e, + factory: factory, + withChecksum: withChecksum, + } +} + +// Entity returns one value from the factory. Always returns the same entity. +func (s SyncSubject) Entity() contracts.Entity { + return s.entity +} + +// Factory returns the entity factory function that calls Init() on the created contracts.Entity if applicable. +func (s SyncSubject) Factory() contracts.EntityFactoryFunc { + return s.factory +} + +// FactoryForDelta behaves like Factory() unless s is WithChecksum(). +// In the latter case it returns a factory for EntityWithChecksum instead. +// Rationale: Sync#ApplyDelta() uses its input entities which are WithChecksum() only for the delta itself +// and not for insertion into the database, so EntityWithChecksum is enough. And it consumes less memory. +func (s SyncSubject) FactoryForDelta() contracts.EntityFactoryFunc { + if s.withChecksum { + return v1.NewEntityWithChecksum + } + + return s.factory +} + +// Name returns the declared name of the entity. +func (s SyncSubject) Name() string { + return utils.Name(s.entity) +} + +// WithChecksum returns whether entities from the factory implement contracts.Checksumer. +func (s SyncSubject) WithChecksum() bool { + return s.withChecksum +} diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 0000000..a683014 --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,146 @@ +package config + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "fmt" + "github.com/creasty/defaults" + "github.com/goccy/go-yaml" + "github.com/jessevdk/go-flags" + "github.com/pkg/errors" + "io/ioutil" + "os" +) + +// Config defines Icinga DB config. +type Config struct { + Database Database `yaml:"database"` + Redis Redis `yaml:"redis"` + Logging Logging `yaml:"logging"` + Retention Retention `yaml:"retention"` + + DecodeWarning error `yaml:"-"` +} + +// Validate checks constraints in the supplied configuration and returns an error if they are violated. +func (c *Config) Validate() error { + if err := c.Database.Validate(); err != nil { + return err + } + if err := c.Redis.Validate(); err != nil { + return err + } + if err := c.Logging.Validate(); err != nil { + return err + } + if err := c.Retention.Validate(); err != nil { + return err + } + + return nil +} + +// Flags defines CLI flags. +type Flags struct { + // Version decides whether to just print the version and exit. + Version bool `long:"version" description:"print version and exit"` + // Config is the path to the config file + Config string `short:"c" long:"config" description:"path to config file" required:"true" default:"/etc/icingadb/config.yml"` +} + +// FromYAMLFile returns a new Config value created from the given YAML config file. +func FromYAMLFile(name string) (*Config, error) { + f, err := os.ReadFile(name) + if err != nil { + return nil, errors.Wrap(err, "can't read YAML file "+name) + } + + c := &Config{} + d := yaml.NewDecoder(bytes.NewReader(f)) + + if err := defaults.Set(c); err != nil { + return nil, errors.Wrap(err, "can't set config defaults") + } + + if err := d.Decode(c); err != nil { + return nil, errors.Wrap(err, "can't parse YAML file "+name) + } + + // Decode again with yaml.DisallowUnknownField() (like v1.2 will do) and issue a warning if it returns an error. + c.DecodeWarning = yaml.NewDecoder(bytes.NewReader(f), yaml.DisallowUnknownField()).Decode(&Config{}) + + if err := c.Validate(); err != nil { + const msg = "invalid configuration" + if warn := c.DecodeWarning; warn != nil { + return nil, fmt.Errorf("%s: %w\n\nwarning: ignored unknown config option:\n\n%v", msg, err, warn) + } else { + return nil, errors.Wrap(err, msg) + } + } + + return c, nil +} + +// ParseFlags parses CLI flags and +// returns a Flags value created from them. +func ParseFlags() (*Flags, error) { + f := &Flags{} + parser := flags.NewParser(f, flags.Default) + + if _, err := parser.Parse(); err != nil { + return nil, errors.Wrap(err, "can't parse CLI flags") + } + + return f, nil +} + +// TLS provides TLS configuration options for Redis and Database. +type TLS struct { + Enable bool `yaml:"tls"` + Cert string `yaml:"cert"` + Key string `yaml:"key"` + Ca string `yaml:"ca"` + Insecure bool `yaml:"insecure"` +} + +// MakeConfig assembles a tls.Config from t and serverName. +func (t *TLS) MakeConfig(serverName string) (*tls.Config, error) { + if !t.Enable { + return nil, nil + } + + tlsConfig := &tls.Config{} + if t.Cert == "" { + if t.Key != "" { + return nil, errors.New("private key given, but client certificate missing") + } + } else if t.Key == "" { + return nil, errors.New("client certificate given, but private key missing") + } else { + crt, err := tls.LoadX509KeyPair(t.Cert, t.Key) + if err != nil { + return nil, errors.Wrap(err, "can't load X.509 key pair") + } + + tlsConfig.Certificates = []tls.Certificate{crt} + } + + if t.Insecure { + tlsConfig.InsecureSkipVerify = true + } else if t.Ca != "" { + raw, err := ioutil.ReadFile(t.Ca) + if err != nil { + return nil, errors.Wrap(err, "can't read CA file") + } + + tlsConfig.RootCAs = x509.NewCertPool() + if !tlsConfig.RootCAs.AppendCertsFromPEM(raw) { + return nil, errors.New("can't parse CA file") + } + } + + tlsConfig.ServerName = serverName + + return tlsConfig, nil +} diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go new file mode 100644 index 0000000..2418094 --- /dev/null +++ b/pkg/config/config_test.go @@ -0,0 +1,77 @@ +package config + +import ( + "github.com/creasty/defaults" + "github.com/icinga/icingadb/pkg/logging" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func TestFromYAMLFile(t *testing.T) { + const miniConf = ` +database: + host: 192.0.2.1 + database: icingadb + user: icingadb + password: icingadb + +redis: + host: 2001:db8::1 +` + + miniOutput := &Config{} + _ = defaults.Set(miniOutput) + + miniOutput.Database.Host = "192.0.2.1" + miniOutput.Database.Database = "icingadb" + miniOutput.Database.User = "icingadb" + miniOutput.Database.Password = "icingadb" + + miniOutput.Redis.Host = "2001:db8::1" + miniOutput.Logging.Output = logging.CONSOLE + + subtests := []struct { + name string + input string + output *Config + warn bool + }{ + { + name: "mini", + input: miniConf, + output: miniOutput, + warn: false, + }, + { + name: "mini-with-unknown", + input: miniConf + "\nunknown: 42", + output: miniOutput, + warn: true, + }, + } + + for _, st := range subtests { + t.Run(st.name, func(t *testing.T) { + tempFile, err := os.CreateTemp("", "") + require.NoError(t, err) + defer func() { _ = os.Remove(tempFile.Name()) }() + + require.NoError(t, os.WriteFile(tempFile.Name(), []byte(st.input), 0o600)) + + actual, err := FromYAMLFile(tempFile.Name()) + require.NoError(t, err) + + if st.warn { + require.Error(t, actual.DecodeWarning, "reading config should produce a warning") + + // Reset the warning so that the following require.Equal() doesn't try to compare it. + actual.DecodeWarning = nil + } else { + require.NoError(t, actual.DecodeWarning, "reading config should not produce a warning") + } + + require.Equal(t, st.output, actual) + }) + } +} diff --git a/pkg/config/database.go b/pkg/config/database.go new file mode 100644 index 0000000..b42ff8e --- /dev/null +++ b/pkg/config/database.go @@ -0,0 +1,175 @@ +package config + +import ( + "fmt" + "github.com/go-sql-driver/mysql" + "github.com/icinga/icingadb/pkg/driver" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/utils" + "github.com/jmoiron/sqlx" + "github.com/jmoiron/sqlx/reflectx" + "github.com/pkg/errors" + "net" + "net/url" + "strconv" + "strings" + "sync" + "time" +) + +var registerDriverOnce sync.Once + +// Database defines database client configuration. +type Database struct { + Type string `yaml:"type" default:"mysql"` + Host string `yaml:"host"` + Port int `yaml:"port"` + Database string `yaml:"database"` + User string `yaml:"user"` + Password string `yaml:"password"` + TlsOptions TLS `yaml:",inline"` + Options icingadb.Options `yaml:"options"` +} + +// Open prepares the DSN string and driver configuration, +// calls sqlx.Open, but returns *icingadb.DB. +func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { + registerDriverOnce.Do(func() { + driver.Register(logger) + }) + + var dsn string + switch d.Type { + case "mysql": + config := mysql.NewConfig() + + config.User = d.User + config.Passwd = d.Password + + if d.isUnixAddr() { + config.Net = "unix" + config.Addr = d.Host + } else { + config.Net = "tcp" + port := d.Port + if port == 0 { + port = 3306 + } + config.Addr = net.JoinHostPort(d.Host, fmt.Sprint(port)) + } + + config.DBName = d.Database + config.Timeout = time.Minute + config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"} + + tlsConfig, err := d.TlsOptions.MakeConfig(d.Host) + if err != nil { + return nil, err + } + + if tlsConfig != nil { + config.TLSConfig = "icingadb" + if err := mysql.RegisterTLSConfig(config.TLSConfig, tlsConfig); err != nil { + return nil, errors.Wrap(err, "can't register TLS config") + } + } + + dsn = config.FormatDSN() + case "pgsql": + uri := &url.URL{ + Scheme: "postgres", + User: url.UserPassword(d.User, d.Password), + Path: "/" + url.PathEscape(d.Database), + } + + query := url.Values{ + "connect_timeout": {"60"}, + "binary_parameters": {"yes"}, + + // Host and port can alternatively be specified in the query string. lib/pq can't parse the connection URI + // if a Unix domain socket path is specified in the host part of the URI, therefore always use the query + // string. See also https://github.com/lib/pq/issues/796 + "host": {d.Host}, + } + if d.Port != 0 { + query["port"] = []string{strconv.FormatInt(int64(d.Port), 10)} + } + + if _, err := d.TlsOptions.MakeConfig(d.Host); err != nil { + return nil, err + } + + if d.TlsOptions.Enable { + if d.TlsOptions.Insecure { + query["sslmode"] = []string{"require"} + } else { + query["sslmode"] = []string{"verify-full"} + } + + if d.TlsOptions.Cert != "" { + query["sslcert"] = []string{d.TlsOptions.Cert} + } + + if d.TlsOptions.Key != "" { + query["sslkey"] = []string{d.TlsOptions.Key} + } + + if d.TlsOptions.Ca != "" { + query["sslrootcert"] = []string{d.TlsOptions.Ca} + } + } else { + query["sslmode"] = []string{"disable"} + } + + uri.RawQuery = query.Encode() + dsn = uri.String() + default: + return nil, unknownDbType(d.Type) + } + + db, err := sqlx.Open("icingadb-"+d.Type, dsn) + if err != nil { + return nil, errors.Wrap(err, "can't open database") + } + + db.SetMaxIdleConns(d.Options.MaxConnections / 3) + db.SetMaxOpenConns(d.Options.MaxConnections) + + db.Mapper = reflectx.NewMapperFunc("db", func(s string) string { + return utils.Key(s, '_') + }) + + return icingadb.NewDb(db, logger, &d.Options), nil +} + +// Validate checks constraints in the supplied database configuration and returns an error if they are violated. +func (d *Database) Validate() error { + switch d.Type { + case "mysql", "pgsql": + default: + return unknownDbType(d.Type) + } + + if d.Host == "" { + return errors.New("database host missing") + } + + if d.User == "" { + return errors.New("database user missing") + } + + if d.Database == "" { + return errors.New("database name missing") + } + + return d.Options.Validate() +} + +func (d *Database) isUnixAddr() bool { + return strings.HasPrefix(d.Host, "/") +} + +func unknownDbType(t string) error { + return errors.Errorf(`unknown database type %q, must be one of: "mysql", "pgsql"`, t) +} diff --git a/pkg/config/history_retention.go b/pkg/config/history_retention.go new file mode 100644 index 0000000..d4373b7 --- /dev/null +++ b/pkg/config/history_retention.go @@ -0,0 +1,30 @@ +package config + +import ( + "github.com/icinga/icingadb/pkg/icingadb/history" + "github.com/pkg/errors" + "time" +) + +// Retention defines configuration for history retention. +type Retention struct { + HistoryDays uint64 `yaml:"history-days"` + SlaDays uint64 `yaml:"sla-days"` + Interval time.Duration `yaml:"interval" default:"1h"` + Count uint64 `yaml:"count" default:"5000"` + Options history.RetentionOptions `yaml:"options"` +} + +// Validate checks constraints in the supplied retention configuration and +// returns an error if they are violated. +func (r *Retention) Validate() error { + if r.Interval <= 0 { + return errors.New("retention interval must be positive") + } + + if r.Count == 0 { + return errors.New("count must be greater than zero") + } + + return r.Options.Validate() +} diff --git a/pkg/config/logging.go b/pkg/config/logging.go new file mode 100644 index 0000000..9ccd35e --- /dev/null +++ b/pkg/config/logging.go @@ -0,0 +1,44 @@ +package config + +import ( + "github.com/icinga/icingadb/pkg/logging" + "github.com/pkg/errors" + "go.uber.org/zap/zapcore" + "os" + "time" +) + +// Logging defines Logger configuration. +type Logging struct { + // zapcore.Level at 0 is for info level. + Level zapcore.Level `yaml:"level" default:"0"` + Output string `yaml:"output"` + // Interval for periodic logging. + Interval time.Duration `yaml:"interval" default:"20s"` + + logging.Options `yaml:"options"` +} + +// Validate checks constraints in the supplied Logging configuration and returns an error if they are violated. +// Also configures the log output if it is not configured: +// systemd-journald is used when Icinga DB is running under systemd, otherwise stderr. +func (l *Logging) Validate() error { + if l.Interval <= 0 { + return errors.New("periodic logging interval must be positive") + } + + if l.Output == "" { + if _, ok := os.LookupEnv("NOTIFY_SOCKET"); ok { + // When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, + // which is the default setting for the Icinga DB service. + // This assumes that Icinga DB is running under systemd, so set output to systemd-journald. + l.Output = logging.JOURNAL + } else { + // Otherwise set it to console, i.e. write log messages to stderr. + l.Output = logging.CONSOLE + } + } + + // To be on the safe side, always call AssertOutput. + return logging.AssertOutput(l.Output) +} diff --git a/pkg/config/redis.go b/pkg/config/redis.go new file mode 100644 index 0000000..38571e3 --- /dev/null +++ b/pkg/config/redis.go @@ -0,0 +1,116 @@ +package config + +import ( + "context" + "crypto/tls" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/retry" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "net" + "strings" + "time" +) + +// Redis defines Redis client configuration. +type Redis struct { + Host string `yaml:"host"` + Port int `yaml:"port" default:"6380"` + Password string `yaml:"password"` + TlsOptions TLS `yaml:",inline"` + Options icingaredis.Options `yaml:"options"` +} + +type ctxDialerFunc = func(ctx context.Context, network, addr string) (net.Conn, error) + +// NewClient prepares Redis client configuration, +// calls redis.NewClient, but returns *icingaredis.Client. +func (r *Redis) NewClient(logger *logging.Logger) (*icingaredis.Client, error) { + tlsConfig, err := r.TlsOptions.MakeConfig(r.Host) + if err != nil { + return nil, err + } + + var dialer ctxDialerFunc + dl := &net.Dialer{Timeout: 15 * time.Second} + + if tlsConfig == nil { + dialer = dl.DialContext + } else { + dialer = (&tls.Dialer{NetDialer: dl, Config: tlsConfig}).DialContext + } + + options := &redis.Options{ + Dialer: dialWithLogging(dialer, logger), + Password: r.Password, + DB: 0, // Use default DB, + ReadTimeout: r.Options.Timeout, + TLSConfig: tlsConfig, + } + + if strings.HasPrefix(r.Host, "/") { + options.Network = "unix" + options.Addr = r.Host + } else { + options.Network = "tcp" + options.Addr = net.JoinHostPort(r.Host, fmt.Sprint(r.Port)) + } + + c := redis.NewClient(options) + + opts := c.Options() + opts.PoolSize = utils.MaxInt(32, opts.PoolSize) + opts.MaxRetries = opts.PoolSize + 1 // https://github.com/go-redis/redis/issues/1737 + c = redis.NewClient(opts) + + return icingaredis.NewClient(c, logger, &r.Options), nil +} + +// dialWithLogging returns a Redis Dialer with logging capabilities. +func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc { + // dial behaves like net.Dialer#DialContext, + // but re-tries on common errors that are considered retryable. + return func(ctx context.Context, network, addr string) (conn net.Conn, err error) { + err = retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { + conn, err = dialer(ctx, network, addr) + return + }, + retry.Retryable, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + retry.Settings{ + Timeout: 5 * time.Minute, + OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + if lastErr == nil || err.Error() != lastErr.Error() { + logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err)) + } + }, + OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { + if attempt > 0 { + logger.Infow("Reconnected to Redis", + zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) + } + }, + }, + ) + + err = errors.Wrap(err, "can't connect to Redis") + + return + } +} + +// Validate checks constraints in the supplied Redis configuration and returns an error if they are violated. +func (r *Redis) Validate() error { + if r.Host == "" { + return errors.New("Redis host missing") + } + + return r.Options.Validate() +} diff --git a/pkg/contracts/contracts.go b/pkg/contracts/contracts.go new file mode 100644 index 0000000..a8b4201 --- /dev/null +++ b/pkg/contracts/contracts.go @@ -0,0 +1,90 @@ +package contracts + +// Entity is implemented by every type Icinga DB should synchronize. +type Entity interface { + Fingerprinter + IDer +} + +// Fingerprinter is implemented by every entity that uniquely identifies itself. +type Fingerprinter interface { + // Fingerprint returns the value that uniquely identifies the entity. + Fingerprint() Fingerprinter +} + +// ID is a unique identifier of an entity. +type ID interface { + // String returns the string representation form of the ID. + // The String method is used to use the ID in functions + // where it needs to be compared or hashed. + String() string +} + +// IDer is implemented by every entity that uniquely identifies itself. +type IDer interface { + ID() ID // ID returns the ID. + SetID(ID) // SetID sets the ID. +} + +// Equaler is implemented by every type that is comparable. +type Equaler interface { + Equal(Equaler) bool // Equal checks for equality. +} + +// Checksum is a unique identifier of an entity. +type Checksum interface { + Equaler + // String returns the string representation form of the Checksum. + // The String method is used to use the Checksum in functions + // where it needs to be compared or hashed. + String() string +} + +// Checksumer is implemented by every entity with a checksum. +type Checksumer interface { + Checksum() Checksum // Checksum returns the Checksum. + SetChecksum(Checksum) // SetChecksum sets the Checksum. +} + +// EntityFactoryFunc knows how to create an Entity. +type EntityFactoryFunc func() Entity + +// Waiter implements the Wait method, +// which blocks until execution is complete. +type Waiter interface { + Wait() error // Wait waits for execution to complete. +} + +// The WaiterFunc type is an adapter to allow the use of ordinary functions as Waiter. +// If f is a function with the appropriate signature, WaiterFunc(f) is a Waiter that calls f. +type WaiterFunc func() error + +// Wait implements the Waiter interface. +func (f WaiterFunc) Wait() error { + return f() +} + +// Initer implements the Init method, +// which initializes the object in addition to zeroing. +type Initer interface { + Init() // Init initializes the object. +} + +// Upserter implements the Upsert method, +// which returns a part of the object for ON DUPLICATE KEY UPDATE. +type Upserter interface { + Upsert() interface{} // Upsert partitions the object. +} + +// TableNamer implements the TableName method, +// which returns the table of the object. +type TableNamer interface { + TableName() string // TableName tells the table. +} + +// Scoper implements the Scope method, +// which returns a struct specifying the WHERE conditions that +// entities must satisfy in order to be SELECTed. +type Scoper interface { + Scope() interface{} +} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go new file mode 100644 index 0000000..f529db4 --- /dev/null +++ b/pkg/driver/driver.go @@ -0,0 +1,114 @@ +package driver + +import ( + "context" + "database/sql" + "database/sql/driver" + "github.com/go-sql-driver/mysql" + "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/retry" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "go.uber.org/zap" + "time" +) + +const MySQL = "icingadb-mysql" +const PostgreSQL = "icingadb-pgsql" + +var timeout = time.Minute * 5 + +// RetryConnector wraps driver.Connector with retry logic. +type RetryConnector struct { + driver.Connector + driver Driver +} + +// Connect implements part of the driver.Connector interface. +func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { + var conn driver.Conn + err := errors.Wrap(retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { + conn, err = c.Connector.Connect(ctx) + return + }, + shouldRetry, + backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1), + retry.Settings{ + Timeout: timeout, + OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + telemetry.UpdateCurrentDbConnErr(err) + + if lastErr == nil || err.Error() != lastErr.Error() { + c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err)) + } + }, + OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { + telemetry.UpdateCurrentDbConnErr(nil) + + if attempt > 0 { + c.driver.Logger.Infow("Reconnected to database", + zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) + } + }, + }, + ), "can't connect to database") + return conn, err +} + +// Driver implements part of the driver.Connector interface. +func (c RetryConnector) Driver() driver.Driver { + return c.driver +} + +// Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector. +type Driver struct { + ctxDriver + Logger *logging.Logger +} + +// OpenConnector implements the DriverContext interface. +func (d Driver) OpenConnector(name string) (driver.Connector, error) { + c, err := d.ctxDriver.OpenConnector(name) + if err != nil { + return nil, err + } + + return &RetryConnector{ + driver: d, + Connector: c, + }, nil +} + +// Register makes our database Driver available under the name "icingadb-*sql". +func Register(logger *logging.Logger) { + sql.Register(MySQL, &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger}) + sql.Register(PostgreSQL, &Driver{ctxDriver: PgSQLDriver{}, Logger: logger}) + _ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) })) + sqlx.BindDriver(PostgreSQL, sqlx.DOLLAR) +} + +// ctxDriver helps ensure that we only support drivers that implement driver.Driver and driver.DriverContext. +type ctxDriver interface { + driver.Driver + driver.DriverContext +} + +// mysqlLogger is an adapter that allows ordinary functions to be used as a logger for mysql.SetLogger. +type mysqlLogger func(v ...interface{}) + +// Print implements the mysql.Logger interface. +func (log mysqlLogger) Print(v ...interface{}) { + log(v) +} + +func shouldRetry(err error) bool { + if errors.Is(err, driver.ErrBadConn) { + return true + } + + return retry.Retryable(err) +} diff --git a/pkg/driver/pgsql.go b/pkg/driver/pgsql.go new file mode 100644 index 0000000..3c88fe0 --- /dev/null +++ b/pkg/driver/pgsql.go @@ -0,0 +1,22 @@ +package driver + +import ( + "database/sql/driver" + "github.com/lib/pq" +) + +// PgSQLDriver extends pq.Driver with driver.DriverContext compliance. +type PgSQLDriver struct { + pq.Driver +} + +// Assert interface compliance. +var ( + _ driver.Driver = PgSQLDriver{} + _ driver.DriverContext = PgSQLDriver{} +) + +// OpenConnector implements the driver.DriverContext interface. +func (PgSQLDriver) OpenConnector(name string) (driver.Connector, error) { + return pq.NewConnector(name) +} diff --git a/pkg/flatten/flatten.go b/pkg/flatten/flatten.go new file mode 100644 index 0000000..94a6e7e --- /dev/null +++ b/pkg/flatten/flatten.go @@ -0,0 +1,47 @@ +package flatten + +import ( + "database/sql" + "fmt" + "github.com/icinga/icingadb/pkg/types" + "strconv" +) + +// Flatten creates flat, one-dimensional maps from arbitrarily nested values, e.g. JSON. +func Flatten(value interface{}, prefix string) map[string]types.String { + var flatten func(string, interface{}) + flattened := make(map[string]types.String) + + flatten = func(key string, value interface{}) { + switch value := value.(type) { + case map[string]interface{}: + if len(value) == 0 { + flattened[key] = types.String{} + break + } + + for k, v := range value { + flatten(key+"."+k, v) + } + case []interface{}: + if len(value) == 0 { + flattened[key] = types.String{} + break + } + + for i, v := range value { + flatten(key+"["+strconv.Itoa(i)+"]", v) + } + default: + val := "null" + if value != nil { + val = fmt.Sprintf("%v", value) + } + flattened[key] = types.String{NullString: sql.NullString{String: val, Valid: true}} + } + } + + flatten(prefix, value) + + return flattened +} diff --git a/pkg/icingadb/cleanup.go b/pkg/icingadb/cleanup.go new file mode 100644 index 0000000..e57eafa --- /dev/null +++ b/pkg/icingadb/cleanup.go @@ -0,0 +1,80 @@ +package icingadb + +import ( + "context" + "fmt" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/driver" + "github.com/icinga/icingadb/pkg/types" + "time" +) + +// CleanupStmt defines information needed to compose cleanup statements. +type CleanupStmt struct { + Table string + PK string + Column string +} + +// Build assembles the cleanup statement for the specified database driver with the given limit. +func (stmt *CleanupStmt) Build(driverName string, limit uint64) string { + switch driverName { + case driver.MySQL, "mysql": + return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time +ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit) + case driver.PostgreSQL, "postgres": + return fmt.Sprintf(`WITH rows AS ( +SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d +) +DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit) + default: + panic(fmt.Sprintf("invalid database type %s", driverName)) + } +} + +// CleanupOlderThan deletes all rows with the specified statement that are older than the given time. +// Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess. +// Returns the total number of rows deleted. +func (db *DB) CleanupOlderThan( + ctx context.Context, stmt CleanupStmt, envId types.Binary, + count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}], +) (uint64, error) { + var counter com.Counter + defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop() + + for { + q := db.Rebind(stmt.Build(db.DriverName(), count)) + rs, err := db.NamedExecContext(ctx, q, cleanupWhere{ + EnvironmentId: envId, + Time: types.UnixMilli(olderThan), + }) + if err != nil { + return 0, internal.CantPerformQuery(err, q) + } + + n, err := rs.RowsAffected() + if err != nil { + return 0, err + } + + counter.Add(uint64(n)) + + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, make([]struct{}, n)); err != nil { + return 0, err + } + } + + if n < int64(count) { + break + } + } + + return counter.Total(), nil +} + +type cleanupWhere struct { + EnvironmentId types.Binary + Time types.UnixMilli +} diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go new file mode 100644 index 0000000..4ff3e0d --- /dev/null +++ b/pkg/icingadb/db.go @@ -0,0 +1,673 @@ +package icingadb + +import ( + "context" + "fmt" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/driver" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/retry" + "github.com/icinga/icingadb/pkg/utils" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" + "reflect" + "strings" + "sync" + "time" +) + +// DB is a wrapper around sqlx.DB with bulk execution, +// statement building, streaming and logging capabilities. +type DB struct { + *sqlx.DB + + Options *Options + + logger *logging.Logger + tableSemaphores map[string]*semaphore.Weighted + tableSemaphoresMu sync.Mutex +} + +// Options define user configurable database options. +type Options struct { + // Maximum number of open connections to the database. + MaxConnections int `yaml:"max_connections" default:"16"` + + // Maximum number of connections per table, + // regardless of what the connection is actually doing, + // e.g. INSERT, UPDATE, DELETE. + MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"` + + // MaxPlaceholdersPerStatement defines the maximum number of placeholders in an + // INSERT, UPDATE or DELETE statement. Theoretically, MySQL can handle up to 2^16-1 placeholders, + // but this increases the execution time of queries and thus reduces the number of queries + // that can be executed in parallel in a given time. + // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. + MaxPlaceholdersPerStatement int `yaml:"max_placeholders_per_statement" default:"8192"` + + // MaxRowsPerTransaction defines the maximum number of rows per transaction. + // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. + MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"` +} + +// Validate checks constraints in the supplied database options and returns an error if they are violated. +func (o *Options) Validate() error { + if o.MaxConnections == 0 { + return errors.New("max_connections cannot be 0. Configure a value greater than zero, or use -1 for no connection limit") + } + if o.MaxConnectionsPerTable < 1 { + return errors.New("max_connections_per_table must be at least 1") + } + if o.MaxPlaceholdersPerStatement < 1 { + return errors.New("max_placeholders_per_statement must be at least 1") + } + if o.MaxRowsPerTransaction < 1 { + return errors.New("max_rows_per_transaction must be at least 1") + } + + return nil +} + +// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB. +func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB { + return &DB{ + DB: db, + logger: logger, + Options: options, + tableSemaphores: make(map[string]*semaphore.Weighted), + } +} + +const ( + expectedMysqlSchemaVersion = 4 + expectedPostgresSchemaVersion = 2 +) + +// CheckSchema asserts the database schema of the expected version being present. +func (db *DB) CheckSchema(ctx context.Context) error { + var expectedDbSchemaVersion uint16 + switch db.DriverName() { + case driver.MySQL: + expectedDbSchemaVersion = expectedMysqlSchemaVersion + case driver.PostgreSQL: + expectedDbSchemaVersion = expectedPostgresSchemaVersion + } + + var version uint16 + + err := db.QueryRowxContext(ctx, "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1").Scan(&version) + if err != nil { + return errors.Wrap(err, "can't check database schema version") + } + + if version != expectedDbSchemaVersion { + // Since these error messages are trivial and mostly caused by users, we don't need + // to print a stack trace here. However, since errors.Errorf() does this automatically, + // we need to use fmt instead. + return fmt.Errorf( + "unexpected database schema version: v%d (expected v%d), please make sure you have applied all database"+ + " migrations after upgrading Icinga DB", version, expectedDbSchemaVersion, + ) + } + + return nil +} + +// BuildColumns returns all columns of the given struct. +func (db *DB) BuildColumns(subject interface{}) []string { + fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names + columns := make([]string, 0, len(fields)) + for _, f := range fields { + if f.Field.Tag == "" { + continue + } + columns = append(columns, f.Name) + } + + return columns +} + +// BuildDeleteStmt returns a DELETE statement for the given struct. +func (db *DB) BuildDeleteStmt(from interface{}) string { + return fmt.Sprintf( + `DELETE FROM "%s" WHERE id IN (?)`, + utils.TableName(from), + ) +} + +// BuildInsertStmt returns an INSERT INTO statement for the given struct. +func (db *DB) BuildInsertStmt(into interface{}) (string, int) { + columns := db.BuildColumns(into) + + return fmt.Sprintf( + `INSERT INTO "%s" ("%s") VALUES (%s)`, + utils.TableName(into), + strings.Join(columns, `", "`), + fmt.Sprintf(":%s", strings.Join(columns, ", :")), + ), len(columns) +} + +// BuildInsertIgnoreStmt returns an INSERT statement for the specified struct for +// which the database ignores rows that have already been inserted. +func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int) { + table := utils.TableName(into) + columns := db.BuildColumns(into) + var clause string + + switch db.DriverName() { + case driver.MySQL: + // MySQL treats UPDATE id = id as a no-op. + clause = fmt.Sprintf(`ON DUPLICATE KEY UPDATE "%s" = "%s"`, columns[0], columns[0]) + case driver.PostgreSQL: + clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO NOTHING", table) + } + + return fmt.Sprintf( + `INSERT INTO "%s" ("%s") VALUES (%s) %s`, + table, + strings.Join(columns, `", "`), + fmt.Sprintf(":%s", strings.Join(columns, ", :")), + clause, + ), len(columns) +} + +// BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct +// and the column list from the specified columns struct. +func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string { + q := fmt.Sprintf( + `SELECT "%s" FROM "%s"`, + strings.Join(db.BuildColumns(columns), `", "`), + utils.TableName(table), + ) + + if scoper, ok := table.(contracts.Scoper); ok { + where, _ := db.BuildWhere(scoper.Scope()) + q += ` WHERE ` + where + } + + return q +} + +// BuildUpdateStmt returns an UPDATE statement for the given struct. +func (db *DB) BuildUpdateStmt(update interface{}) (string, int) { + columns := db.BuildColumns(update) + set := make([]string, 0, len(columns)) + + for _, col := range columns { + set = append(set, fmt.Sprintf(`"%s" = :%s`, col, col)) + } + + return fmt.Sprintf( + `UPDATE "%s" SET %s WHERE id = :id`, + utils.TableName(update), + strings.Join(set, ", "), + ), len(columns) + 1 // +1 because of WHERE id = :id +} + +// BuildUpsertStmt returns an upsert statement for the given struct. +func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int) { + insertColumns := db.BuildColumns(subject) + table := utils.TableName(subject) + var updateColumns []string + + if upserter, ok := subject.(contracts.Upserter); ok { + updateColumns = db.BuildColumns(upserter.Upsert()) + } else { + updateColumns = insertColumns + } + + var clause, setFormat string + switch db.DriverName() { + case driver.MySQL: + clause = "ON DUPLICATE KEY UPDATE" + setFormat = `"%[1]s" = VALUES("%[1]s")` + case driver.PostgreSQL: + clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO UPDATE SET", table) + setFormat = `"%[1]s" = EXCLUDED."%[1]s"` + } + + set := make([]string, 0, len(updateColumns)) + + for _, col := range updateColumns { + set = append(set, fmt.Sprintf(setFormat, col)) + } + + return fmt.Sprintf( + `INSERT INTO "%s" ("%s") VALUES (%s) %s %s`, + table, + strings.Join(insertColumns, `", "`), + fmt.Sprintf(":%s", strings.Join(insertColumns, ",:")), + clause, + strings.Join(set, ","), + ), len(insertColumns) +} + +// BuildWhere returns a WHERE clause with named placeholder conditions built from the specified struct +// combined with the AND operator. +func (db *DB) BuildWhere(subject interface{}) (string, int) { + columns := db.BuildColumns(subject) + where := make([]string, 0, len(columns)) + for _, col := range columns { + where = append(where, fmt.Sprintf(`"%s" = :%s`, col, col)) + } + + return strings.Join(where, ` AND `), len(columns) +} + +// OnSuccess is a callback for successful (bulk) DML operations. +type OnSuccess[T any] func(ctx context.Context, affectedRows []T) (err error) + +func OnSuccessIncrement[T any](counter *com.Counter) OnSuccess[T] { + return func(_ context.Context, rows []T) error { + counter.Add(uint64(len(rows))) + return nil + } +} + +func OnSuccessSendTo[T any](ch chan<- T) OnSuccess[T] { + return func(ctx context.Context, rows []T) error { + for _, row := range rows { + select { + case ch <- row: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + } +} + +// BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. +// Takes in up to the number of arguments specified in count from the arg stream, +// derives and expands a query and executes it with this set of arguments until the arg stream has been processed. +// The derived queries are executed in a separate goroutine with a weighting of 1 +// and can be executed concurrently to the extent allowed by the semaphore passed in sem. +// Arguments for which the query ran successfully will be passed to onSuccess. +func (db *DB) BulkExec( + ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any], +) error { + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + + g, ctx := errgroup.WithContext(ctx) + // Use context from group. + bulk := com.Bulk(ctx, arg, count, com.NeverSplit[any]) + + g.Go(func() error { + g, ctx := errgroup.WithContext(ctx) + + for b := range bulk { + if err := sem.Acquire(ctx, 1); err != nil { + return errors.Wrap(err, "can't acquire semaphore") + } + + g.Go(func(b []interface{}) func() error { + return func() error { + defer sem.Release(1) + + return retry.WithBackoff( + ctx, + func(context.Context) error { + stmt, args, err := sqlx.In(query, b) + if err != nil { + return errors.Wrapf(err, "can't build placeholders for %q", query) + } + + stmt = db.Rebind(stmt) + _, err = db.ExecContext(ctx, stmt, args...) + if err != nil { + return internal.CantPerformQuery(err, query) + } + + counter.Add(uint64(len(b))) + + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, b); err != nil { + return err + } + } + + return nil + }, + retry.Retryable, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + retry.Settings{}, + ) + } + }(b)) + } + + return g.Wait() + }) + + return g.Wait() +} + +// NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely +// in the format INSERT ... VALUES. Takes in up to the number of entities specified in count +// from the arg stream, derives and executes a new query with the VALUES clause expanded to +// this set of arguments, until the arg stream has been processed. +// The queries are executed in a separate goroutine with a weighting of 1 +// and can be executed concurrently to the extent allowed by the semaphore passed in sem. +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) NamedBulkExec( + ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, + splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity], onSuccess ...OnSuccess[contracts.Entity], +) error { + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + + g, ctx := errgroup.WithContext(ctx) + bulk := com.Bulk(ctx, arg, count, splitPolicyFactory) + + g.Go(func() error { + for { + select { + case b, ok := <-bulk: + if !ok { + return nil + } + + if err := sem.Acquire(ctx, 1); err != nil { + return errors.Wrap(err, "can't acquire semaphore") + } + + g.Go(func(b []contracts.Entity) func() error { + return func() error { + defer sem.Release(1) + + return retry.WithBackoff( + ctx, + func(ctx context.Context) error { + _, err := db.NamedExecContext(ctx, query, b) + if err != nil { + return internal.CantPerformQuery(err, query) + } + + counter.Add(uint64(len(b))) + + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, b); err != nil { + return err + } + } + + return nil + }, + retry.Retryable, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + retry.Settings{}, + ) + } + }(b)) + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return g.Wait() +} + +// NamedBulkExecTx bulk executes queries with named placeholders in separate transactions. +// Takes in up to the number of entities specified in count from the arg stream and +// executes a new transaction that runs a new query for each entity in this set of arguments, +// until the arg stream has been processed. +// The transactions are executed in a separate goroutine with a weighting of 1 +// and can be executed concurrently to the extent allowed by the semaphore passed in sem. +func (db *DB) NamedBulkExecTx( + ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, +) error { + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + + g, ctx := errgroup.WithContext(ctx) + bulk := com.Bulk(ctx, arg, count, com.NeverSplit[contracts.Entity]) + + g.Go(func() error { + for { + select { + case b, ok := <-bulk: + if !ok { + return nil + } + + if err := sem.Acquire(ctx, 1); err != nil { + return errors.Wrap(err, "can't acquire semaphore") + } + + g.Go(func(b []contracts.Entity) func() error { + return func() error { + defer sem.Release(1) + + return retry.WithBackoff( + ctx, + func(ctx context.Context) error { + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return errors.Wrap(err, "can't start transaction") + } + + stmt, err := tx.PrepareNamedContext(ctx, query) + if err != nil { + return errors.Wrap(err, "can't prepare named statement with context in transaction") + } + + for _, arg := range b { + if _, err := stmt.ExecContext(ctx, arg); err != nil { + return errors.Wrap(err, "can't execute statement in transaction") + } + } + + if err := tx.Commit(); err != nil { + return errors.Wrap(err, "can't commit transaction") + } + + counter.Add(uint64(len(b))) + + return nil + }, + retry.Retryable, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + retry.Settings{}, + ) + } + }(b)) + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return g.Wait() +} + +// BatchSizeByPlaceholders returns how often the specified number of placeholders fits +// into Options.MaxPlaceholdersPerStatement, but at least 1. +func (db *DB) BatchSizeByPlaceholders(n int) int { + s := db.Options.MaxPlaceholdersPerStatement / n + if s > 0 { + return s + } + + return 1 +} + +// YieldAll executes the query with the supplied scope, +// scans each resulting row into an entity returned by the factory function, +// and streams them into a returned channel. +func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error) { + entities := make(chan contracts.Entity, 1) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + defer close(entities) + + rows, err := db.NamedQueryContext(ctx, query, scope) + if err != nil { + return internal.CantPerformQuery(err, query) + } + defer rows.Close() + + for rows.Next() { + e := factoryFunc() + + if err := rows.StructScan(e); err != nil { + return errors.Wrapf(err, "can't store query result into a %T: %s", e, query) + } + + select { + case entities <- e: + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + + return entities, com.WaitAsync(g) +} + +// CreateStreamed bulk creates the specified entities via NamedBulkExec. +// The insert statement is created using BuildInsertStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) CreateStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return errors.Wrap(err, "can't copy first entity") + } + + sem := db.GetSemaphoreForTable(utils.TableName(first)) + stmt, placeholders := db.BuildInsertStmt(first) + + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.NeverSplit[contracts.Entity], onSuccess..., + ) +} + +// CreateIgnoreStreamed bulk creates the specified entities via NamedBulkExec. +// The insert statement is created using BuildInsertIgnoreStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) CreateIgnoreStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return errors.Wrap(err, "can't copy first entity") + } + + sem := db.GetSemaphoreForTable(utils.TableName(first)) + stmt, placeholders := db.BuildInsertIgnoreStmt(first) + + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.SplitOnDupId[contracts.Entity], onSuccess..., + ) +} + +// UpsertStreamed bulk upserts the specified entities via NamedBulkExec. +// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) UpsertStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return errors.Wrap(err, "can't copy first entity") + } + + sem := db.GetSemaphoreForTable(utils.TableName(first)) + stmt, placeholders := db.BuildUpsertStmt(first) + + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.SplitOnDupId[contracts.Entity], onSuccess..., + ) +} + +// UpdateStreamed bulk updates the specified entities via NamedBulkExecTx. +// The update statement is created using BuildUpdateStmt with the first entity from the entities stream. +// Bulk size is controlled via Options.MaxRowsPerTransaction and +// concurrency is controlled via Options.MaxConnectionsPerTable. +func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { + first, forward, err := com.CopyFirst(ctx, entities) + if first == nil { + return errors.Wrap(err, "can't copy first entity") + } + sem := db.GetSemaphoreForTable(utils.TableName(first)) + stmt, _ := db.BuildUpdateStmt(first) + + return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward) +} + +// DeleteStreamed bulk deletes the specified ids via BulkExec. +// The delete statement is created using BuildDeleteStmt with the passed entityType. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and +// concurrency is controlled via Options.MaxConnectionsPerTable. +// IDs for which the query ran successfully will be passed to onSuccess. +func (db *DB) DeleteStreamed( + ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any], +) error { + sem := db.GetSemaphoreForTable(utils.TableName(entityType)) + return db.BulkExec( + ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess..., + ) +} + +// Delete creates a channel from the specified ids and +// bulk deletes them by passing the channel along with the entityType to DeleteStreamed. +// IDs for which the query ran successfully will be passed to onSuccess. +func (db *DB) Delete( + ctx context.Context, entityType contracts.Entity, ids []interface{}, onSuccess ...OnSuccess[any], +) error { + idsCh := make(chan interface{}, len(ids)) + for _, id := range ids { + idsCh <- id + } + close(idsCh) + + return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...) +} + +func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { + db.tableSemaphoresMu.Lock() + defer db.tableSemaphoresMu.Unlock() + + if sem, ok := db.tableSemaphores[table]; ok { + return sem + } else { + sem = semaphore.NewWeighted(int64(db.Options.MaxConnectionsPerTable)) + db.tableSemaphores[table] = sem + return sem + } +} + +func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper { + return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) { + if count := counter.Reset(); count > 0 { + db.logger.Debugf("Executed %q with %d rows", query, count) + } + }, periodic.OnStop(func(tick periodic.Tick) { + db.logger.Debugf("Finished executing %q with %d rows in %s", query, counter.Total(), tick.Elapsed) + })) +} diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go new file mode 100644 index 0000000..4f6d098 --- /dev/null +++ b/pkg/icingadb/delta.go @@ -0,0 +1,124 @@ +package icingadb + +import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/utils" + "go.uber.org/zap" + "time" +) + +// Delta calculates the delta of actual and desired entities, and stores which entities need to be created, updated, and deleted. +type Delta struct { + Create EntitiesById + Update EntitiesById + Delete EntitiesById + Subject *common.SyncSubject + done chan error + logger *logging.Logger +} + +// NewDelta creates a new Delta and starts calculating it. The caller must ensure +// that no duplicate entities are sent to the same stream. +func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *logging.Logger) *Delta { + delta := &Delta{ + Subject: subject, + done: make(chan error, 1), + logger: logger, + } + + go delta.run(ctx, actual, desired) + + return delta +} + +// Wait waits for the delta calculation to complete and returns an error, if any. +func (delta *Delta) Wait() error { + return <-delta.done +} + +func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) { + defer close(delta.done) + + start := time.Now() + var endActual, endDesired time.Time + var numActual, numDesired uint64 + + actual := EntitiesById{} // only read from actualCh (so far) + desired := EntitiesById{} // only read from desiredCh (so far) + + var update EntitiesById + if delta.Subject.WithChecksum() { + update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums + } + + for actualCh != nil || desiredCh != nil { + select { + case actualValue, ok := <-actualCh: + if !ok { + endActual = time.Now() + actualCh = nil // Done reading all actual entities, disable this case. + break + } + numActual++ + + id := actualValue.ID().String() + if desiredValue, ok := desired[id]; ok { + delete(desired, id) + if update != nil && !checksumsMatch(actualValue, desiredValue) { + update[id] = desiredValue + } + } else { + actual[id] = actualValue + } + + case desiredValue, ok := <-desiredCh: + if !ok { + endDesired = time.Now() + desiredCh = nil // Done reading all desired entities, disable this case. + break + } + numDesired++ + + id := desiredValue.ID().String() + if actualValue, ok := actual[id]; ok { + delete(actual, id) + if update != nil && !checksumsMatch(actualValue, desiredValue) { + update[id] = desiredValue + } + } else { + desired[id] = desiredValue + } + + case <-ctx.Done(): + delta.done <- ctx.Err() + return + } + } + + delta.Create = desired + delta.Update = update + delta.Delete = actual + + delta.logger.Debugw(fmt.Sprintf("Finished %s delta", utils.Name(delta.Subject.Entity())), + zap.String("subject", utils.Name(delta.Subject.Entity())), + zap.Duration("time_total", time.Since(start)), + zap.Duration("time_actual", endActual.Sub(start)), + zap.Duration("time_desired", endDesired.Sub(start)), + zap.Uint64("num_actual", numActual), + zap.Uint64("num_desired", numDesired), + zap.Int("create", len(delta.Create)), + zap.Int("update", len(delta.Update)), + zap.Int("delete", len(delta.Delete))) +} + +// checksumsMatch returns whether the checksums of two entities are the same. +// Both entities must implement contracts.Checksumer. +func checksumsMatch(a, b contracts.Entity) bool { + c1 := a.(contracts.Checksumer).Checksum() + c2 := b.(contracts.Checksumer).Checksum() + return c1.Equal(c2) +} diff --git a/pkg/icingadb/delta_test.go b/pkg/icingadb/delta_test.go new file mode 100644 index 0000000..909cc22 --- /dev/null +++ b/pkg/icingadb/delta_test.go @@ -0,0 +1,268 @@ +package icingadb + +import ( + "context" + "encoding/binary" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "strconv" + "sync" + "testing" + "time" +) + +func TestDelta(t *testing.T) { + type TestData struct { + Name string // name for the sub-test + Actual, Desired uint64 // checksum to send to actual/desired + Create, Update, Delete uint64 // checksum that must be in the corresponding map (if != 0) + } + + tests := []TestData{{ + Name: "Empty", + }, { + Name: "Create", + Desired: 0x1111111111111111, + Create: 0x1111111111111111, + }, { + Name: "Update", + Actual: 0x1111111111111111, + Desired: 0x2222222222222222, + Update: 0x2222222222222222, + }, { + Name: "Delete", + Actual: 0x1111111111111111, + Delete: 0x1111111111111111, + }, { + Name: "Keep", + Actual: 0x1111111111111111, + Desired: 0x1111111111111111, + }} + + makeEndpoint := func(id, checksum uint64) *v1.Endpoint { + e := new(v1.Endpoint) + e.Id = testDeltaMakeIdOrChecksum(id) + e.PropertiesChecksum = testDeltaMakeIdOrChecksum(checksum) + return e + } + + // Send the entities to the actual and desired channels in different ordering to catch bugs in the implementation + // that only show depending on the order in which actual and desired values are processed for an ID. + type SendOrder struct { + Name string + Send func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) + } + sendOrders := []SendOrder{{ + Name: "ActualFirst", + Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) { + if test.Actual != 0 { + chActual <- makeEndpoint(id, test.Actual) + } + if test.Desired != 0 { + chDesired <- makeEndpoint(id, test.Desired) + } + }, + }, { + Name: "DesiredFirst", + Send: func(id uint64, test TestData, chActual, chDesired chan<- contracts.Entity) { + if test.Desired != 0 { + chDesired <- makeEndpoint(id, test.Desired) + } + if test.Actual != 0 { + chActual <- makeEndpoint(id, test.Actual) + } + }, + }} + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + for _, sendOrder := range sendOrders { + t.Run(sendOrder.Name, func(t *testing.T) { + id := uint64(0x42) + chActual := make(chan contracts.Entity) + chDesired := make(chan contracts.Entity) + subject := common.NewSyncSubject(v1.NewEndpoint) + logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second) + + go func() { + sendOrder.Send(id, test, chActual, chDesired) + close(chActual) + close(chDesired) + }() + + delta := NewDelta(context.Background(), chActual, chDesired, subject, logger) + err := delta.Wait() + require.NoError(t, err, "delta should finish without error") + + _, ok := <-chActual + require.False(t, ok, "chActual should have been closed") + _, ok = <-chDesired + require.False(t, ok, "chDesired should have been closed") + + testDeltaVerifyResult(t, "Create", testDeltaMakeExpectedMap(id, test.Create), delta.Create) + testDeltaVerifyResult(t, "Update", testDeltaMakeExpectedMap(id, test.Update), delta.Update) + testDeltaVerifyResult(t, "Delete", testDeltaMakeExpectedMap(id, test.Delete), delta.Delete) + }) + } + }) + } + + t.Run("Combined", func(t *testing.T) { + chActual := make(chan contracts.Entity) + chDesired := make(chan contracts.Entity) + subject := common.NewSyncSubject(v1.NewEndpoint) + logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second) + + expectedCreate := make(map[uint64]uint64) + expectedUpdate := make(map[uint64]uint64) + expectedDelete := make(map[uint64]uint64) + + nextId := uint64(1) + var wg sync.WaitGroup + for _, test := range tests { + test := test + for _, sendOrder := range sendOrders { + sendOrder := sendOrder + id := nextId + nextId++ + // Log ID mapping to allow easier debugging in case of failures. + t.Logf("ID=%d(%s) Test=%s SendOrder=%s", + id, testDeltaMakeIdOrChecksum(id).String(), test.Name, sendOrder.Name) + wg.Add(1) + go func() { + defer wg.Done() + sendOrder.Send(id, test, chActual, chDesired) + }() + + if test.Create != 0 { + expectedCreate[id] = test.Create + } + if test.Update != 0 { + expectedUpdate[id] = test.Update + } + if test.Delete != 0 { + expectedDelete[id] = test.Delete + } + } + } + go func() { + wg.Wait() + close(chActual) + close(chDesired) + }() + + delta := NewDelta(context.Background(), chActual, chDesired, subject, logger) + err := delta.Wait() + require.NoError(t, err, "delta should finish without error") + + _, ok := <-chActual + require.False(t, ok, "chActual should have been closed") + _, ok = <-chDesired + require.False(t, ok, "chDesired should have been closed") + + testDeltaVerifyResult(t, "Create", expectedCreate, delta.Create) + testDeltaVerifyResult(t, "Update", expectedUpdate, delta.Update) + testDeltaVerifyResult(t, "Delete", expectedDelete, delta.Delete) + }) +} + +func testDeltaMakeIdOrChecksum(i uint64) types.Binary { + b := make([]byte, 20) + binary.BigEndian.PutUint64(b, i) + return b +} + +func testDeltaMakeExpectedMap(id uint64, checksum uint64) map[uint64]uint64 { + if checksum == 0 { + return nil + } else { + return map[uint64]uint64{ + id: checksum, + } + } +} + +func testDeltaVerifyResult(t *testing.T, name string, expected map[uint64]uint64, got EntitiesById) { + for id, checksum := range expected { + idKey := testDeltaMakeIdOrChecksum(id).String() + if assert.Containsf(t, got, idKey, "%s: should contain %s", name, idKey) { + expectedChecksum := testDeltaMakeIdOrChecksum(checksum).String() + gotChecksum := got[idKey].(contracts.Checksumer).Checksum().String() + assert.Equalf(t, expectedChecksum, gotChecksum, "%s: %s should match checksum", name, idKey) + delete(got, idKey) + } + } + + for id := range got { + assert.Failf(t, "unexpected element", "%s: should not contain %s", name, id) + } +} + +func BenchmarkDelta(b *testing.B) { + for n := 1 << 10; n <= 1<<20; n <<= 1 { + b.Run(strconv.Itoa(n), func(b *testing.B) { + benchmarkDelta(b, n) + }) + } +} + +func benchmarkDelta(b *testing.B, numEntities int) { + chActual := make([]chan contracts.Entity, b.N) + chDesired := make([]chan contracts.Entity, b.N) + for i := 0; i < b.N; i++ { + chActual[i] = make(chan contracts.Entity, numEntities) + chDesired[i] = make(chan contracts.Entity, numEntities) + } + makeEndpoint := func(id1, id2, checksum uint64) *v1.Endpoint { + e := new(v1.Endpoint) + e.Id = make([]byte, 20) + binary.BigEndian.PutUint64(e.Id[0:], id1) + binary.BigEndian.PutUint64(e.Id[8:], id2) + e.PropertiesChecksum = make([]byte, 20) + binary.BigEndian.PutUint64(e.PropertiesChecksum, checksum) + return e + } + for i := 0; i < numEntities; i++ { + // each iteration writes exactly one entity to each channel + var eActual, eDesired contracts.Entity + switch i % 3 { + case 0: // distinct IDs + eActual = makeEndpoint(1, uint64(i), uint64(i)) + eDesired = makeEndpoint(2, uint64(i), uint64(i)) + case 1: // same ID, same checksum + e := makeEndpoint(3, uint64(i), uint64(i)) + eActual = e + eDesired = e + case 2: // same ID, different checksum + eActual = makeEndpoint(4, uint64(i), uint64(i)) + eDesired = makeEndpoint(4, uint64(i), uint64(i+1)) + } + for _, ch := range chActual { + ch <- eActual + } + for _, ch := range chDesired { + ch <- eDesired + } + } + for i := 0; i < b.N; i++ { + close(chActual[i]) + close(chDesired[i]) + } + subject := common.NewSyncSubject(v1.NewEndpoint) + // logger := zaptest.NewLogger(b).Sugar() + logger := logging.NewLogger(zap.New(zapcore.NewTee()).Sugar(), time.Second) + b.ResetTimer() + for i := 0; i < b.N; i++ { + d := NewDelta(context.Background(), chActual[i], chDesired[i], subject, logger) + err := d.Wait() + assert.NoError(b, err, "delta should not fail") + } +} diff --git a/pkg/icingadb/dump_signals.go b/pkg/icingadb/dump_signals.go new file mode 100644 index 0000000..bce1aef --- /dev/null +++ b/pkg/icingadb/dump_signals.go @@ -0,0 +1,136 @@ +package icingadb + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/pkg/errors" + "go.uber.org/zap" + "sync" +) + +// DumpSignals reads dump signals from a Redis stream via Listen. +// Dump-done signals are passed on via Done channels, while InProgress must be checked for dump-wip signals. +type DumpSignals struct { + redis *icingaredis.Client + logger *logging.Logger + mutex sync.Mutex + doneCh map[string]chan struct{} + allDoneCh chan struct{} + inProgressCh chan struct{} +} + +// NewDumpSignals returns new DumpSignals. +func NewDumpSignals(redis *icingaredis.Client, logger *logging.Logger) *DumpSignals { + return &DumpSignals{ + redis: redis, + logger: logger, + doneCh: make(map[string]chan struct{}), + inProgressCh: make(chan struct{}), + } +} + +// Listen starts listening for dump signals in the icinga:dump Redis stream. When a done signal is received, this is +// signaled via the channels returned from the Done function. +// +// If a wip signal is received after a done signal was passed on via the Done function, this is signaled via the +// InProgress function and this function returns with err == nil. In this case, all other signals are invalidated. +// It is up to the caller to pass on this information, for example by cancelling derived contexts. +// +// This function may only be called once for each DumpSignals object. To listen for a new iteration of dump signals, a new +// DumpSignals instance must be created. +func (s *DumpSignals) Listen(ctx context.Context) error { + // Closing a channel twice results in a panic. This function takes a chan struct{} and closes it unless it is + // already closed. In this case it just does nothing. This function assumes that the channel is never written to + // and that there are no concurrent attempts to close the channel. + safeClose := func(ch chan struct{}) { + select { + case <-ch: + // Reading from a closed channel returns immediately, therefore don't close it again. + default: + close(ch) + } + } + + lastStreamId := "0-0" + anyDoneSent := false + + for { + if err := ctx.Err(); err != nil { + return errors.Wrap(err, "can't listen for dump signals") + } + + streams, err := s.redis.XReadUntilResult(ctx, &redis.XReadArgs{ + Streams: []string{"icinga:dump", lastStreamId}, + }) + if err != nil { + return errors.Wrap(err, "can't read dump signals") + } + + for _, entry := range streams[0].Messages { + lastStreamId = entry.ID + key := entry.Values["key"].(string) + done := entry.Values["state"].(string) == "done" + + s.logger.Debugw("Received dump signal from Redis", zap.String("key", key), zap.Bool("done", done)) + + if done { + if key == "*" { + if s.allDoneCh == nil { + s.mutex.Lock() + + // Set s.allDoneCh to signal for all future listeners that we've received an all-done signal. + s.allDoneCh = make(chan struct{}) + close(s.allDoneCh) + + // Notify all existing listeners. + for _, ch := range s.doneCh { + safeClose(ch) + } + + s.mutex.Unlock() + } + } else { + s.mutex.Lock() + if ch, ok := s.doneCh[key]; ok { + safeClose(ch) + } + s.mutex.Unlock() + } + anyDoneSent = true + } else if anyDoneSent { + // Received a wip signal after handing out any done signal via one of the channels returned by Done, + // signal that a new dump is in progress. This treats every state=wip as if it has key=*, which is the + // only key for which state=wip is currently sent by Icinga 2. + close(s.inProgressCh) + return nil + } + } + } +} + +// Done returns a channel that is closed when the given key receives a done dump signal. +func (s *DumpSignals) Done(key string) <-chan struct{} { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.allDoneCh != nil { + // An all done-signal was already received, don't care about individual key anymore. + return s.allDoneCh + } else if ch, ok := s.doneCh[key]; ok { + // Return existing wait channel for this key. + return ch + } else { + // First request for this key, create new wait channel. + ch = make(chan struct{}) + s.doneCh[key] = ch + return ch + } +} + +// InProgress returns a channel that is closed when a new dump is in progress after done signals were sent to channels +// returned by Done. +func (s *DumpSignals) InProgress() <-chan struct{} { + return s.inProgressCh +} diff --git a/pkg/icingadb/entitiesbyid.go b/pkg/icingadb/entitiesbyid.go new file mode 100644 index 0000000..b40050e --- /dev/null +++ b/pkg/icingadb/entitiesbyid.go @@ -0,0 +1,48 @@ +package icingadb + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" +) + +// EntitiesById is a map of key-contracts.Entity pairs. +type EntitiesById map[string]contracts.Entity + +// Keys returns the keys. +func (ebi EntitiesById) Keys() []string { + keys := make([]string, 0, len(ebi)) + for k := range ebi { + keys = append(keys, k) + } + + return keys +} + +// IDs returns the contracts.ID of the entities. +func (ebi EntitiesById) IDs() []interface{} { + ids := make([]interface{}, 0, len(ebi)) + for _, v := range ebi { + ids = append(ids, v.(contracts.IDer).ID()) + } + + return ids +} + +// Entities streams the entities on a returned channel. +func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity { + entities := make(chan contracts.Entity) + + go func() { + defer close(entities) + + for _, v := range ebi { + select { + case <-ctx.Done(): + return + case entities <- v: + } + } + }() + + return entities +} diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go new file mode 100644 index 0000000..74d3b32 --- /dev/null +++ b/pkg/icingadb/ha.go @@ -0,0 +1,456 @@ +package icingadb + +import ( + "bytes" + "context" + "database/sql" + "encoding/hex" + "github.com/google/uuid" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/driver" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis" + icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/retry" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "sync" + "time" +) + +var timeout = 60 * time.Second + +type haState struct { + responsibleTsMilli int64 + responsible bool + otherResponsible bool +} + +// HA provides high availability and indicates whether a Takeover or Handover must be made. +type HA struct { + state com.Atomic[haState] + ctx context.Context + cancelCtx context.CancelFunc + instanceId types.Binary + db *DB + environmentMu sync.Mutex + environment *v1.Environment + heartbeat *icingaredis.Heartbeat + logger *logging.Logger + responsible bool + handover chan struct{} + takeover chan struct{} + done chan struct{} + errOnce sync.Once + errMu sync.Mutex + err error +} + +// NewHA returns a new HA and starts the controller loop. +func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *logging.Logger) *HA { + ctx, cancelCtx := context.WithCancel(ctx) + + instanceId := uuid.New() + + ha := &HA{ + ctx: ctx, + cancelCtx: cancelCtx, + instanceId: instanceId[:], + db: db, + heartbeat: heartbeat, + logger: logger, + handover: make(chan struct{}), + takeover: make(chan struct{}), + done: make(chan struct{}), + } + + go ha.controller() + + return ha +} + +// Close shuts h down. +func (h *HA) Close(ctx context.Context) error { + // Cancel ctx. + h.cancelCtx() + // Wait until the controller loop ended. + <-h.Done() + // Remove our instance from the database. + h.removeInstance(ctx) + // And return an error, if any. + return h.Err() +} + +// Done returns a channel that's closed when the HA controller loop ended. +func (h *HA) Done() <-chan struct{} { + return h.done +} + +// Environment returns the current environment. +func (h *HA) Environment() *v1.Environment { + h.environmentMu.Lock() + defer h.environmentMu.Unlock() + + return h.environment +} + +// Err returns an error if Done has been closed and there is an error. Otherwise returns nil. +func (h *HA) Err() error { + h.errMu.Lock() + defer h.errMu.Unlock() + + return h.err +} + +// Handover returns a channel with which handovers are signaled. +func (h *HA) Handover() chan struct{} { + return h.handover +} + +// Takeover returns a channel with which takeovers are signaled. +func (h *HA) Takeover() chan struct{} { + return h.takeover +} + +// State returns the status quo. +func (h *HA) State() (responsibleTsMilli int64, responsible, otherResponsible bool) { + state, _ := h.state.Load() + return state.responsibleTsMilli, state.responsible, state.otherResponsible +} + +func (h *HA) abort(err error) { + h.errOnce.Do(func() { + h.errMu.Lock() + h.err = errors.Wrap(err, "HA aborted") + h.errMu.Unlock() + + h.cancelCtx() + }) +} + +// controller loop. +func (h *HA) controller() { + defer close(h.done) + + h.logger.Debugw("Starting HA", zap.String("instance_id", hex.EncodeToString(h.instanceId))) + + oldInstancesRemoved := false + + logTicker := time.NewTicker(time.Second * 60) + defer logTicker.Stop() + shouldLog := true + + for { + select { + case m := <-h.heartbeat.Events(): + if m != nil { + now := time.Now() + t, err := m.Stats().Time() + if err != nil { + h.abort(err) + } + tt := t.Time() + if tt.After(now.Add(1 * time.Second)) { + h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) + } + if tt.Before(now.Add(-1 * timeout)) { + h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) + h.signalHandover() + h.realizeLostHeartbeat() + continue + } + s, err := m.Stats().IcingaStatus() + if err != nil { + h.abort(err) + } + + envId, err := m.EnvironmentID() + if err != nil { + h.abort(err) + } + + if h.environment == nil || !bytes.Equal(h.environment.Id, envId) { + if h.environment != nil { + h.logger.Fatalw("Environment changed unexpectedly", + zap.String("current", h.environment.Id.String()), + zap.String("new", envId.String())) + } + + h.environmentMu.Lock() + h.environment = &v1.Environment{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{IdMeta: v1.IdMeta{ + Id: envId, + }}, + Name: types.String{ + NullString: sql.NullString{ + String: envId.String(), + Valid: true, + }, + }, + } + h.environmentMu.Unlock() + } + + select { + case <-logTicker.C: + shouldLog = true + default: + } + + var realizeCtx context.Context + var cancelRealizeCtx context.CancelFunc + if h.responsible { + realizeCtx, cancelRealizeCtx = context.WithDeadline(h.ctx, m.ExpiryTime()) + } else { + realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx) + } + err = h.realize(realizeCtx, s, t, envId, shouldLog) + cancelRealizeCtx() + if errors.Is(err, context.DeadlineExceeded) { + h.signalHandover() + continue + } + if err != nil { + h.abort(err) + } + + if !oldInstancesRemoved { + go h.removeOldInstances(s, envId) + oldInstancesRemoved = true + } + + shouldLog = false + } else { + h.logger.Error("Lost heartbeat") + h.signalHandover() + h.realizeLostHeartbeat() + } + case <-h.heartbeat.Done(): + if err := h.heartbeat.Err(); err != nil { + h.abort(err) + } + case <-h.ctx.Done(): + return + } + } +} + +func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error { + var takeover, otherResponsible bool + + err := retry.WithBackoff( + ctx, + func(ctx context.Context) error { + takeover = false + otherResponsible = false + isoLvl := sql.LevelSerializable + selectLock := "" + + if h.db.DriverName() == driver.MySQL { + // The RDBMS may actually be a Percona XtraDB Cluster which doesn't + // support serializable transactions, but only their following equivalent: + isoLvl = sql.LevelRepeatableRead + selectLock = " LOCK IN SHARE MODE" + } + + tx, errBegin := h.db.BeginTxx(ctx, &sql.TxOptions{Isolation: isoLvl}) + if errBegin != nil { + return errors.Wrap(errBegin, "can't start transaction") + } + + query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+ + "WHERE environment_id = ? AND responsible = ? AND id <> ? AND heartbeat > ?") + selectLock + + instance := &v1.IcingadbInstance{} + + errQuery := tx.QueryRowxContext( + ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(), + ).StructScan(instance) + switch errQuery { + case nil: + otherResponsible = true + if shouldLog { + h.logger.Infow("Another instance is active", + zap.String("instance_id", instance.Id.String()), + zap.String("environment", envId.String()), + "heartbeat", instance.Heartbeat, + zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) + } + case sql.ErrNoRows: + takeover = true + default: + return internal.CantPerformQuery(errQuery, query) + } + + i := v1.IcingadbInstance{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{ + Id: h.instanceId, + }, + }, + EnvironmentMeta: v1.EnvironmentMeta{ + EnvironmentId: envId, + }, + Heartbeat: *t, + Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true}, + EndpointId: s.EndpointId, + Icinga2Version: s.Version, + Icinga2StartTime: s.ProgramStart, + Icinga2NotificationsEnabled: s.NotificationsEnabled, + Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled, + Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled, + Icinga2EventHandlersEnabled: s.EventHandlersEnabled, + Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled, + Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled, + } + + stmt, _ := h.db.BuildUpsertStmt(i) + if _, err := tx.NamedExecContext(ctx, stmt, i); err != nil { + return internal.CantPerformQuery(err, stmt) + } + + if takeover { + stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?") + _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId) + + if err != nil { + return internal.CantPerformQuery(err, stmt) + } + } + + if err := tx.Commit(); err != nil { + return errors.Wrap(err, "can't commit transaction") + } + + return nil + }, + retry.Retryable, + backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3), + retry.Settings{ + OnError: func(_ time.Duration, attempt uint64, err, lastErr error) { + if lastErr == nil || err.Error() != lastErr.Error() { + log := h.logger.Debugw + if attempt > 2 { + log = h.logger.Infow + } + + log("Can't update or insert instance. Retrying", zap.Error(err), zap.Uint64("retry count", attempt)) + } + }, + }, + ) + if err != nil { + return err + } + + if takeover { + // Insert the environment after each heartbeat takeover if it does not already exist in the database + // as the environment may have changed, although this is likely to happen very rarely. + if err := h.insertEnvironment(); err != nil { + return errors.Wrap(err, "can't insert environment") + } + + h.signalTakeover() + } else if otherResponsible { + if state, _ := h.state.Load(); !state.otherResponsible { + state.otherResponsible = true + h.state.Store(state) + } + } + + return nil +} + +func (h *HA) realizeLostHeartbeat() { + stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?") + if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) { + h.logger.Warnw("Can't update instance", zap.Error(internal.CantPerformQuery(err, stmt))) + } +} + +// insertEnvironment inserts the environment from the specified state into the database if it does not already exist. +func (h *HA) insertEnvironment() error { + // Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does. + stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment) + + if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil { + return internal.CantPerformQuery(err, stmt) + } + + return nil +} + +func (h *HA) removeInstance(ctx context.Context) { + h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId))) + // Intentionally not using h.ctx here as it's already cancelled. + query := h.db.Rebind("DELETE FROM icingadb_instance WHERE id = ?") + _, err := h.db.ExecContext(ctx, query, h.instanceId) + if err != nil { + h.logger.Warnw("Could not remove instance from database", zap.Error(err), zap.String("query", query)) + } +} + +func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binary) { + select { + case <-h.ctx.Done(): + return + case <-time.After(timeout): + query := h.db.Rebind("DELETE FROM icingadb_instance " + + "WHERE id <> ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?") + heartbeat := types.UnixMilli(time.Now().Add(-timeout)) + result, err := h.db.ExecContext(h.ctx, query, h.instanceId, envId, + s.EndpointId, heartbeat) + if err != nil { + h.logger.Errorw("Can't remove rows of old instances", zap.Error(err), + zap.String("query", query), + zap.String("id", h.instanceId.String()), zap.String("environment_id", envId.String()), + zap.String("endpoint_id", s.EndpointId.String()), zap.Time("heartbeat", heartbeat.Time())) + return + } + affected, err := result.RowsAffected() + if err != nil { + h.logger.Errorw("Can't get number of removed old instances", zap.Error(err)) + return + } + h.logger.Debugf("Removed %d old instances", affected) + } +} + +func (h *HA) signalHandover() { + if h.responsible { + h.state.Store(haState{ + responsibleTsMilli: time.Now().UnixMilli(), + responsible: false, + otherResponsible: false, + }) + + select { + case h.handover <- struct{}{}: + h.responsible = false + case <-h.ctx.Done(): + // Noop + } + } +} + +func (h *HA) signalTakeover() { + if !h.responsible { + h.state.Store(haState{ + responsibleTsMilli: time.Now().UnixMilli(), + responsible: true, + otherResponsible: false, + }) + + select { + case h.takeover <- struct{}{}: + h.responsible = true + case <-h.ctx.Done(): + // Noop + } + } +} diff --git a/pkg/icingadb/history/retention.go b/pkg/icingadb/history/retention.go new file mode 100644 index 0000000..ff217cd --- /dev/null +++ b/pkg/icingadb/history/retention.go @@ -0,0 +1,214 @@ +package history + +import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/icingadb" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/pkg/errors" + "go.uber.org/zap" + "time" +) + +type RetentionType int + +const ( + RetentionHistory RetentionType = iota + RetentionSla +) + +type retentionStatement struct { + icingadb.CleanupStmt + RetentionType + Category string +} + +// RetentionStatements maps history categories with corresponding cleanup statements. +var RetentionStatements = []retentionStatement{{ + RetentionType: RetentionHistory, + Category: "acknowledgement", + CleanupStmt: icingadb.CleanupStmt{ + Table: "acknowledgement_history", + PK: "id", + Column: "clear_time", + }, +}, { + RetentionType: RetentionHistory, + Category: "comment", + CleanupStmt: icingadb.CleanupStmt{ + Table: "comment_history", + PK: "comment_id", + Column: "remove_time", + }, +}, { + RetentionType: RetentionHistory, + Category: "downtime", + CleanupStmt: icingadb.CleanupStmt{ + Table: "downtime_history", + PK: "downtime_id", + Column: "end_time", + }, +}, { + RetentionType: RetentionHistory, + Category: "flapping", + CleanupStmt: icingadb.CleanupStmt{ + Table: "flapping_history", + PK: "id", + Column: "end_time", + }, +}, { + RetentionType: RetentionHistory, + Category: "notification", + CleanupStmt: icingadb.CleanupStmt{ + Table: "notification_history", + PK: "id", + Column: "send_time", + }, +}, { + RetentionType: RetentionHistory, + Category: "state", + CleanupStmt: icingadb.CleanupStmt{ + Table: "state_history", + PK: "id", + Column: "event_time", + }, +}, { + RetentionType: RetentionSla, + Category: "sla_downtime", + CleanupStmt: icingadb.CleanupStmt{ + Table: "sla_history_downtime", + PK: "downtime_id", + Column: "downtime_end", + }, +}, { + RetentionType: RetentionSla, + Category: "sla_state", + CleanupStmt: icingadb.CleanupStmt{ + Table: "sla_history_state", + PK: "id", + Column: "event_time", + }, +}} + +// RetentionOptions defines the non-default mapping of history categories with their retention period in days. +type RetentionOptions map[string]uint64 + +// Validate checks constraints in the supplied retention options and +// returns an error if they are violated. +func (o RetentionOptions) Validate() error { + allowedCategories := make(map[string]struct{}) + for _, stmt := range RetentionStatements { + if stmt.RetentionType == RetentionHistory { + allowedCategories[stmt.Category] = struct{}{} + } + } + + for category := range o { + if _, ok := allowedCategories[category]; !ok { + return errors.Errorf("invalid key %s for history retention", category) + } + } + + return nil +} + +// Retention deletes rows from history tables that exceed their configured retention period. +type Retention struct { + db *icingadb.DB + logger *logging.Logger + historyDays uint64 + slaDays uint64 + interval time.Duration + count uint64 + options RetentionOptions +} + +// NewRetention returns a new Retention. +func NewRetention( + db *icingadb.DB, historyDays uint64, slaDays uint64, interval time.Duration, + count uint64, options RetentionOptions, logger *logging.Logger, +) *Retention { + return &Retention{ + db: db, + logger: logger, + historyDays: historyDays, + slaDays: slaDays, + interval: interval, + count: count, + options: options, + } +} + +// Start starts the retention. +func (r *Retention) Start(ctx context.Context) error { + ctx, cancelCtx := context.WithCancel(ctx) + defer cancelCtx() + + e, ok := v1.EnvironmentFromContext(ctx) + if !ok { + return errors.New("can't get environment from context") + } + + errs := make(chan error, 1) + + for _, stmt := range RetentionStatements { + var days uint64 + switch stmt.RetentionType { + case RetentionHistory: + if d, ok := r.options[stmt.Category]; ok { + days = d + } else { + days = r.historyDays + } + case RetentionSla: + days = r.slaDays + } + + if days < 1 { + r.logger.Debugf("Skipping history retention for category %s", stmt.Category) + continue + } + + r.logger.Debugw( + fmt.Sprintf("Starting history retention for category %s", stmt.Category), + zap.Uint64("count", r.count), + zap.Duration("interval", r.interval), + zap.Uint64("retention-days", days), + ) + + stmt := stmt + periodic.Start(ctx, r.interval, func(tick periodic.Tick) { + olderThan := tick.Time.AddDate(0, 0, -int(days)) + + r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s", + stmt.Category, stmt.Table, olderThan) + + deleted, err := r.db.CleanupOlderThan( + ctx, stmt.CleanupStmt, e.Id, r.count, olderThan, + icingadb.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup), + ) + if err != nil { + select { + case errs <- err: + case <-ctx.Done(): + } + + return + } + + if deleted > 0 { + r.logger.Infof("Removed %d old %s history items", deleted, stmt.Category) + } + }, periodic.Immediate()) + } + + select { + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/pkg/icingadb/history/sla.go b/pkg/icingadb/history/sla.go new file mode 100644 index 0000000..79d22c7 --- /dev/null +++ b/pkg/icingadb/history/sla.go @@ -0,0 +1,26 @@ +package history + +import ( + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/icingadb/v1/history" + "github.com/icinga/icingadb/pkg/structify" + "github.com/icinga/icingadb/pkg/types" + "reflect" +) + +var slaStateStructify = structify.MakeMapStructifier(reflect.TypeOf((*history.SlaHistoryState)(nil)).Elem(), "json") + +func stateHistoryToSlaEntity(entry redis.XMessage) ([]history.UpserterEntity, error) { + slaStateInterface, err := slaStateStructify(entry.Values) + if err != nil { + return nil, err + } + slaState := slaStateInterface.(*history.SlaHistoryState) + + if slaState.StateType != types.StateHard { + // only hard state changes are relevant for SLA history, discard all others + return nil, nil + } + + return []history.UpserterEntity{slaState}, nil +} diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go new file mode 100644 index 0000000..dc8bc61 --- /dev/null +++ b/pkg/icingadb/history/sync.go @@ -0,0 +1,383 @@ +package history + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb" + v1types "github.com/icinga/icingadb/pkg/icingadb/v1" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/structify" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "reflect" + "sync" +) + +// Sync specifies the source and destination of a history sync. +type Sync struct { + db *icingadb.DB + redis *icingaredis.Client + logger *logging.Logger +} + +// NewSync creates a new Sync. +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { + return &Sync{ + db: db, + redis: redis, + logger: logger, + } +} + +// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. +func (s Sync) Sync(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + for key, pipeline := range syncPipelines { + key := key + pipeline := pipeline + + s.logger.Debugf("Starting %s history sync", key) + + // The pipeline consists of n+2 stages connected sequentially using n+1 channels of type chan redis.XMessage, + // where n = len(pipeline), i.e. the number of actual sync stages. So the resulting pipeline looks like this: + // + // readFromRedis() Reads from redis and sends the history entries to the next stage + // ↓ ch[0] + // pipeline[0]() First actual sync stage, receives history items from the previous stage, syncs them + // and once completed, sends them off to the next stage. + // ↓ ch[1] + // ... There may be a different number of pipeline stages in between. + // ↓ ch[n-1] + // pipeline[n-1]() Last actual sync stage, once it's done, sends the history item to the final stage. + // ↓ ch[n] + // deleteFromRedis() After all stages have processed a message successfully, this final stage deletes + // the history entry from the Redis stream as it is now persisted in the database. + // + // Each history entry is processed by at most one stage at each time. Each state must forward the entry after + // it has processed it, even if the stage itself does not do anything with this specific entry. It should only + // forward the entry after it has completed its own sync so that later stages can rely on previous stages being + // executed successfully. + + ch := make([]chan redis.XMessage, len(pipeline)+1) + for i := range ch { + if i == 0 { + // Make the first channel buffered so that all items of one read iteration fit into the channel. + // This allows starting the next Redis XREAD right after the previous one has finished. + ch[i] = make(chan redis.XMessage, s.redis.Options.XReadCount) + } else { + ch[i] = make(chan redis.XMessage) + } + } + + g.Go(func() error { + return s.readFromRedis(ctx, key, ch[0]) + }) + + for i, stage := range pipeline { + i := i + stage := stage + + g.Go(func() error { + return stage(ctx, s, key, ch[i], ch[i+1]) + }) + } + + g.Go(func() error { + return s.deleteFromRedis(ctx, key, ch[len(pipeline)]) + }) + } + + return g.Wait() +} + +// readFromRedis is the first stage of the history sync pipeline. It reads the history stream from Redis +// and feeds the history entries into the next stage. +func (s Sync) readFromRedis(ctx context.Context, key string, output chan<- redis.XMessage) error { + defer close(output) + + xra := &redis.XReadArgs{ + Streams: []string{"icinga:history:stream:" + key, "0-0"}, + Count: int64(s.redis.Options.XReadCount), + } + + for { + streams, err := s.redis.XReadUntilResult(ctx, xra) + if err != nil { + return errors.Wrap(err, "can't read history") + } + + for _, stream := range streams { + for _, message := range stream.Messages { + xra.Streams[1] = message.ID + + select { + case output <- message: + case <-ctx.Done(): + return ctx.Err() + } + } + } + } +} + +// deleteFromRedis is the last stage of the history sync pipeline. It receives history entries from the second to last +// pipeline stage and then deletes the stream entry from Redis as all pipeline stages successfully processed the entry. +func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redis.XMessage) error { + var counter com.Counter + defer periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + s.logger.Infof("Synced %d %s history items", count, key) + } + }).Stop() + + bulks := com.Bulk(ctx, input, s.redis.Options.HScanCount, com.NeverSplit[redis.XMessage]) + stream := "icinga:history:stream:" + key + for { + select { + case bulk := <-bulks: + ids := make([]string, len(bulk)) + for i := range bulk { + ids[i] = bulk[i].ID + } + + cmd := s.redis.XDel(ctx, stream, ids...) + if _, err := cmd.Result(); err != nil { + return icingaredis.WrapCmdErr(cmd) + } + + counter.Add(uint64(len(ids))) + telemetry.Stats.History.Add(uint64(len(ids))) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// stageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop +// once that context is canceled), the Sync instance (for access to Redis, SQL database, logging), the key (information +// about which pipeline this function is running in, i.e. "notification"), an in channel for the stage to read history +// events from and an out channel to forward history entries to after processing them successfully. A stage function +// is supposed to forward each message from in to out, even if the event is not relevant for the current stage. On +// error conditions, the message must not be forwarded to the next stage so that the event is not deleted from Redis +// and can be processed at a later time. +type stageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error + +// writeOneEntityStage creates a stageFunc from a pointer to a struct implementing the v1.UpserterEntity interface. +// For each history event it receives, it parses that event into a new instance of that entity type and writes it to +// the database. It writes exactly one entity to the database for each history event. +func writeOneEntityStage(structPtr interface{}) stageFunc { + structifier := structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json") + + return writeMultiEntityStage(func(entry redis.XMessage) ([]v1.UpserterEntity, error) { + ptr, err := structifier(entry.Values) + if err != nil { + return nil, errors.Wrapf(err, "can't structify values %#v", entry.Values) + } + return []v1.UpserterEntity{ptr.(v1.UpserterEntity)}, nil + }) +} + +// writeMultiEntityStage creates a stageFunc from a function that takes a history event as an input and returns a +// (potentially empty) slice of v1.UpserterEntity instances that it then inserts into the database. +func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) stageFunc { + return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { + type State struct { + Message redis.XMessage // Original event from Redis. + Pending int // Number of pending entities. When reaching 0, the message is forwarded to out. + } + + bufSize := s.db.Options.MaxPlaceholdersPerStatement + insert := make(chan contracts.Entity, bufSize) // Events sent to the database for insertion. + inserted := make(chan contracts.Entity) // Events returned by the database after successful insertion. + skipped := make(chan redis.XMessage) // Events skipping insert/inserted (no entities generated). + state := make(map[contracts.Entity]*State) // Shared state between all entities created by one event. + var stateMu sync.Mutex // Synchronizes concurrent access to state. + + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(insert) + defer close(skipped) + + for { + select { + case e, ok := <-in: + if !ok { + return nil + } + + entities, err := entryToEntities(e) + if err != nil { + return err + } + + if len(entities) == 0 { + skipped <- e + } else { + st := &State{ + Message: e, + Pending: len(entities), + } + + stateMu.Lock() + for _, entity := range entities { + state[entity] = st + } + stateMu.Unlock() + + for _, entity := range entities { + select { + case insert <- entity: + case <-ctx.Done(): + return ctx.Err() + } + } + } + + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + g.Go(func() error { + defer close(inserted) + + return s.db.UpsertStreamed(ctx, insert, icingadb.OnSuccessSendTo[contracts.Entity](inserted)) + }) + + g.Go(func() error { + defer close(out) + + for { + select { + case e, ok := <-inserted: + if !ok { + return nil + } + + stateMu.Lock() + st := state[e] + delete(state, e) + stateMu.Unlock() + + st.Pending-- + if st.Pending == 0 { + select { + case out <- st.Message: + case <-ctx.Done(): + return ctx.Err() + } + } + + case m, ok := <-skipped: + if !ok { + return nil + } + + select { + case out <- m: + case <-ctx.Done(): + return ctx.Err() + } + + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return g.Wait() + } +} + +// userNotificationStage is a specialized stageFunc that populates the user_notification_history table. It is executed +// on the notification history stream and uses the users_notified_ids attribute to create an entry in the +// user_notification_history relation table for each user ID. +func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { + type NotificationHistory struct { + Id types.Binary `structify:"id"` + EnvironmentId types.Binary `structify:"environment_id"` + EndpointId types.Binary `structify:"endpoint_id"` + UserIds types.String `structify:"users_notified_ids"` + } + + structifier := structify.MakeMapStructifier(reflect.TypeOf((*NotificationHistory)(nil)).Elem(), "structify") + + return writeMultiEntityStage(func(entry redis.XMessage) ([]v1.UpserterEntity, error) { + rawNotificationHistory, err := structifier(entry.Values) + if err != nil { + return nil, err + } + notificationHistory := rawNotificationHistory.(*NotificationHistory) + + if !notificationHistory.UserIds.Valid { + return nil, nil + } + + var users []types.Binary + err = internal.UnmarshalJSON([]byte(notificationHistory.UserIds.String), &users) + if err != nil { + return nil, err + } + + var userNotifications []v1.UpserterEntity + + for _, user := range users { + userNotifications = append(userNotifications, &v1.UserNotificationHistory{ + EntityWithoutChecksum: v1types.EntityWithoutChecksum{ + IdMeta: v1types.IdMeta{ + Id: utils.Checksum(append(append([]byte(nil), notificationHistory.Id...), user...)), + }, + }, + EnvironmentMeta: v1types.EnvironmentMeta{ + EnvironmentId: notificationHistory.EnvironmentId, + }, + NotificationHistoryId: notificationHistory.Id, + UserId: user, + }) + } + + return userNotifications, nil + })(ctx, s, key, in, out) +} + +var syncPipelines = map[string][]stageFunc{ + "notification": { + writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history + userNotificationStage, // user_notification_history (depends on notification_history) + writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + }, + "state": { + writeOneEntityStage((*v1.StateHistory)(nil)), // state_history + writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) + writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state + }, + "downtime": { + writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history + writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history) + writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime + }, + "comment": { + writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history + writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) + }, + "flapping": { + writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history + writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history) + }, + "acknowledgement": { + writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history + writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) + }, +} diff --git a/pkg/icingadb/objectpacker/objectpacker.go b/pkg/icingadb/objectpacker/objectpacker.go new file mode 100644 index 0000000..9ddfdc8 --- /dev/null +++ b/pkg/icingadb/objectpacker/objectpacker.go @@ -0,0 +1,214 @@ +package objectpacker + +import ( + "bytes" + "encoding/binary" + "fmt" + "github.com/pkg/errors" + "io" + "io/ioutil" + "reflect" + "sort" +) + +// MustPackSlice calls PackAny using items and panics if there was an error. +func MustPackSlice(items ...interface{}) []byte { + var buf bytes.Buffer + + if err := PackAny(items, &buf); err != nil { + panic(err) + } + + return buf.Bytes() +} + +// PackAny packs any JSON-encodable value (ex. structs, also ignores interfaces like encoding.TextMarshaler) +// to a BSON-similar format suitable for consistent hashing. Spec: +// +// PackAny(nil) => 0x0 +// PackAny(false) => 0x1 +// PackAny(true) => 0x2 +// PackAny(float64(42)) => 0x3 ieee754_binary64_bigendian(42) +// PackAny("exämple") => 0x4 uint64_bigendian(len([]byte("exämple"))) []byte("exämple") +// PackAny([]uint8{0x42}) => 0x4 uint64_bigendian(len([]uint8{0x42})) []uint8{0x42} +// PackAny([1]uint8{0x42}) => 0x4 uint64_bigendian(len([1]uint8{0x42})) [1]uint8{0x42} +// PackAny([]T{x,y}) => 0x5 uint64_bigendian(len([]T{x,y})) PackAny(x) PackAny(y) +// PackAny(map[K]V{x:y}) => 0x6 uint64_bigendian(len(map[K]V{x:y})) len(map_key(x)) map_key(x) PackAny(y) +// PackAny((*T)(nil)) => 0x0 +// PackAny((*T)(0x42)) => PackAny(*(*T)(0x42)) +// PackAny(x) => panic() +// +// map_key([1]uint8{0x42}) => [1]uint8{0x42} +// map_key(x) => []byte(fmt.Sprint(x)) +func PackAny(in interface{}, out io.Writer) error { + return errors.Wrapf(packValue(reflect.ValueOf(in), out), "can't pack %#v", in) +} + +var tByte = reflect.TypeOf(byte(0)) +var tBytes = reflect.TypeOf([]uint8(nil)) + +// packValue does the actual job of packAny and just exists for recursion w/o unnecessary reflect.ValueOf calls. +func packValue(in reflect.Value, out io.Writer) error { + switch kind := in.Kind(); kind { + case reflect.Invalid: // nil + _, err := out.Write([]byte{0}) + return err + case reflect.Bool: + if in.Bool() { + _, err := out.Write([]byte{2}) + return err + } else { + _, err := out.Write([]byte{1}) + return err + } + case reflect.Float64: + if _, err := out.Write([]byte{3}); err != nil { + return err + } + + return binary.Write(out, binary.BigEndian, in.Float()) + case reflect.Array, reflect.Slice: + if typ := in.Type(); typ.Elem() == tByte { + if kind == reflect.Array { + if !in.CanAddr() { + vNewElem := reflect.New(typ).Elem() + vNewElem.Set(in) + in = vNewElem + } + + in = in.Slice(0, in.Len()) + } + + // Pack []byte as string, not array of numbers. + return packString(in.Convert(tBytes). // Support types.Binary + Interface().([]uint8), out) + } + + if _, err := out.Write([]byte{5}); err != nil { + return err + } + + l := in.Len() + if err := binary.Write(out, binary.BigEndian, uint64(l)); err != nil { + return err + } + + for i := 0; i < l; i++ { + if err := packValue(in.Index(i), out); err != nil { + return err + } + } + + // If there aren't any values to pack, ... + if l < 1 { + // ... create one and pack it - panics on disallowed type. + _ = packValue(reflect.Zero(in.Type().Elem()), ioutil.Discard) + } + + return nil + case reflect.Interface: + return packValue(in.Elem(), out) + case reflect.Map: + type kv struct { + key []byte + value reflect.Value + } + + if _, err := out.Write([]byte{6}); err != nil { + return err + } + + l := in.Len() + if err := binary.Write(out, binary.BigEndian, uint64(l)); err != nil { + return err + } + + sorted := make([]kv, 0, l) + + { + iter := in.MapRange() + for iter.Next() { + var packedKey []byte + if key := iter.Key(); key.Kind() == reflect.Array { + if typ := key.Type(); typ.Elem() == tByte { + if !key.CanAddr() { + vNewElem := reflect.New(typ).Elem() + vNewElem.Set(key) + key = vNewElem + } + + packedKey = key.Slice(0, key.Len()).Interface().([]byte) + } else { + // Not just stringify the key (below), but also pack it (here) - panics on disallowed type. + _ = packValue(iter.Key(), ioutil.Discard) + + packedKey = []byte(fmt.Sprint(key.Interface())) + } + } else { + // Not just stringify the key (below), but also pack it (here) - panics on disallowed type. + _ = packValue(iter.Key(), ioutil.Discard) + + packedKey = []byte(fmt.Sprint(key.Interface())) + } + + sorted = append(sorted, kv{packedKey, iter.Value()}) + } + } + + sort.Slice(sorted, func(i, j int) bool { return bytes.Compare(sorted[i].key, sorted[j].key) < 0 }) + + for _, kv := range sorted { + if err := binary.Write(out, binary.BigEndian, uint64(len(kv.key))); err != nil { + return err + } + + if _, err := out.Write(kv.key); err != nil { + return err + } + + if err := packValue(kv.value, out); err != nil { + return err + } + } + + // If there aren't any key-value pairs to pack, ... + if l < 1 { + typ := in.Type() + + // ... create one and pack it - panics on disallowed type. + _ = packValue(reflect.Zero(typ.Key()), ioutil.Discard) + _ = packValue(reflect.Zero(typ.Elem()), ioutil.Discard) + } + + return nil + case reflect.Ptr: + if in.IsNil() { + err := packValue(reflect.Value{}, out) + + // Create a fictive referenced value and pack it - panics on disallowed type. + _ = packValue(reflect.Zero(in.Type().Elem()), ioutil.Discard) + + return err + } else { + return packValue(in.Elem(), out) + } + case reflect.String: + return packString([]byte(in.String()), out) + default: + panic("bad type: " + in.Kind().String()) + } +} + +// packString deduplicates string packing of multiple locations in packValue. +func packString(in []byte, out io.Writer) error { + if _, err := out.Write([]byte{4}); err != nil { + return err + } + + if err := binary.Write(out, binary.BigEndian, uint64(len(in))); err != nil { + return err + } + + _, err := out.Write(in) + return err +} diff --git a/pkg/icingadb/objectpacker/objectpacker_test.go b/pkg/icingadb/objectpacker/objectpacker_test.go new file mode 100644 index 0000000..e377d77 --- /dev/null +++ b/pkg/icingadb/objectpacker/objectpacker_test.go @@ -0,0 +1,195 @@ +package objectpacker + +import ( + "bytes" + "github.com/icinga/icingadb/pkg/types" + "github.com/pkg/errors" + "io" + "testing" +) + +// limitedWriter allows writing a specific amount of data. +type limitedWriter struct { + // limit specifies how many bytes to allow to write. + limit int +} + +var _ io.Writer = (*limitedWriter)(nil) + +// Write returns io.EOF once lw.limit is exceeded, nil otherwise. +func (lw *limitedWriter) Write(p []byte) (n int, err error) { + if len(p) <= lw.limit { + lw.limit -= len(p) + return len(p), nil + } + + n = lw.limit + err = io.EOF + + lw.limit = 0 + return +} + +func TestLimitedWriter_Write(t *testing.T) { + assertLimitedWriter_Write(t, 3, []byte{1, 2}, 2, nil, 1) + assertLimitedWriter_Write(t, 3, []byte{1, 2, 3}, 3, nil, 0) + assertLimitedWriter_Write(t, 3, []byte{1, 2, 3, 4}, 3, io.EOF, 0) + assertLimitedWriter_Write(t, 0, []byte{1}, 0, io.EOF, 0) + assertLimitedWriter_Write(t, 0, nil, 0, nil, 0) +} + +func assertLimitedWriter_Write(t *testing.T, limitBefore int, p []byte, n int, err error, limitAfter int) { + t.Helper() + + lw := limitedWriter{limitBefore} + actualN, actualErr := lw.Write(p) + + if !errors.Is(actualErr, err) { + t.Errorf("_, err := (&limitedWriter{%d}).Write(%#v); err != %#v", limitBefore, p, err) + } + + if actualN != n { + t.Errorf("n, _ := (&limitedWriter{%d}).Write(%#v); n != %d", limitBefore, p, n) + } + + if lw.limit != limitAfter { + t.Errorf("lw := limitedWriter{%d}; lw.Write(%#v); lw.limit != %d", limitBefore, p, limitAfter) + } +} + +func TestPackAny(t *testing.T) { + assertPackAny(t, nil, []byte{0}) + assertPackAny(t, false, []byte{1}) + assertPackAny(t, true, []byte{2}) + + assertPackAnyPanic(t, -42, 0) + assertPackAnyPanic(t, int8(-42), 0) + assertPackAnyPanic(t, int16(-42), 0) + assertPackAnyPanic(t, int32(-42), 0) + assertPackAnyPanic(t, int64(-42), 0) + + assertPackAnyPanic(t, uint(42), 0) + assertPackAnyPanic(t, uint8(42), 0) + assertPackAnyPanic(t, uint16(42), 0) + assertPackAnyPanic(t, uint32(42), 0) + assertPackAnyPanic(t, uint64(42), 0) + assertPackAnyPanic(t, uintptr(42), 0) + + assertPackAnyPanic(t, float32(-42.5), 0) + assertPackAny(t, -42.5, []byte{3, 0xc0, 0x45, 0x40, 0, 0, 0, 0, 0}) + + assertPackAnyPanic(t, []struct{}(nil), 9) + assertPackAnyPanic(t, []struct{}{}, 9) + + assertPackAny(t, []interface{}{nil, true, -42.5}, []byte{ + 5, 0, 0, 0, 0, 0, 0, 0, 3, + 0, + 2, + 3, 0xc0, 0x45, 0x40, 0, 0, 0, 0, 0, + }) + + assertPackAny(t, []string{"", "a"}, []byte{ + 5, 0, 0, 0, 0, 0, 0, 0, 2, + 4, 0, 0, 0, 0, 0, 0, 0, 0, + 4, 0, 0, 0, 0, 0, 0, 0, 1, 'a', + }) + + assertPackAnyPanic(t, []interface{}{0 + 0i}, 9) + + assertPackAnyPanic(t, map[struct{}]struct{}(nil), 9) + assertPackAnyPanic(t, map[struct{}]struct{}{}, 9) + + assertPackAny(t, map[interface{}]interface{}{true: "", "nil": -42.5}, []byte{ + 6, 0, 0, 0, 0, 0, 0, 0, 2, + 0, 0, 0, 0, 0, 0, 0, 3, 'n', 'i', 'l', + 3, 0xc0, 0x45, 0x40, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 4, 't', 'r', 'u', 'e', + 4, 0, 0, 0, 0, 0, 0, 0, 0, + }) + + assertPackAny(t, map[string]float64{"": 42}, []byte{ + 6, 0, 0, 0, 0, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 0, + 3, 0x40, 0x45, 0, 0, 0, 0, 0, 0, + }) + + assertPackAny(t, map[[1]byte]bool{{42}: true}, []byte{ + 6, 0, 0, 0, 0, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 1, 42, + 2, + }) + + assertPackAnyPanic(t, map[struct{}]struct{}{{}: {}}, 9) + + assertPackAny(t, (*string)(nil), []byte{0}) + assertPackAnyPanic(t, (*int)(nil), 0) + assertPackAny(t, new(float64), []byte{3, 0, 0, 0, 0, 0, 0, 0, 0}) + + assertPackAny(t, "", []byte{4, 0, 0, 0, 0, 0, 0, 0, 0}) + assertPackAny(t, "a", []byte{4, 0, 0, 0, 0, 0, 0, 0, 1, 'a'}) + assertPackAny(t, "ä", []byte{4, 0, 0, 0, 0, 0, 0, 0, 2, 0xc3, 0xa4}) + + { + var binary [256]byte + for i := range binary { + binary[i] = byte(i) + } + + assertPackAny(t, binary, append([]byte{4, 0, 0, 0, 0, 0, 0, 1, 0}, binary[:]...)) + assertPackAny(t, binary[:], append([]byte{4, 0, 0, 0, 0, 0, 0, 1, 0}, binary[:]...)) + assertPackAny(t, types.Binary(binary[:]), append([]byte{4, 0, 0, 0, 0, 0, 0, 1, 0}, binary[:]...)) + } + + { + type myByte byte + assertPackAnyPanic(t, []myByte(nil), 9) + } + + assertPackAnyPanic(t, complex64(0+0i), 0) + assertPackAnyPanic(t, 0+0i, 0) + assertPackAnyPanic(t, make(chan struct{}), 0) + assertPackAnyPanic(t, func() {}, 0) + assertPackAnyPanic(t, struct{}{}, 0) + assertPackAnyPanic(t, uintptr(0), 0) +} + +func assertPackAny(t *testing.T, in interface{}, out []byte) { + t.Helper() + + { + buf := &bytes.Buffer{} + if err := PackAny(in, buf); err == nil { + if !bytes.Equal(buf.Bytes(), out) { + t.Errorf("buf := &bytes.Buffer{}; packAny(%#v, buf); !bytes.Equal(buf.Bytes(), %#v)", in, out) + } + } else { + t.Errorf("packAny(%#v, &bytes.Buffer{}) != nil", in) + } + } + + for i := 0; i < len(out); i++ { + if !errors.Is(PackAny(in, &limitedWriter{i}), io.EOF) { + t.Errorf("packAny(%#v, &limitedWriter{%d}) != io.EOF", in, i) + } + } +} + +func assertPackAnyPanic(t *testing.T, in interface{}, allowToWrite int) { + t.Helper() + + for i := 0; i < allowToWrite; i++ { + if !errors.Is(PackAny(in, &limitedWriter{i}), io.EOF) { + t.Errorf("packAny(%#v, &limitedWriter{%d}) != io.EOF", in, i) + } + } + + defer func() { + t.Helper() + + if r := recover(); r == nil { + t.Errorf("packAny(%#v, &limitedWriter{%d}) didn't panic", in, allowToWrite) + } + }() + + _ = PackAny(in, &limitedWriter{allowToWrite}) +} diff --git a/pkg/icingadb/overdue/get_overdues.lua b/pkg/icingadb/overdue/get_overdues.lua new file mode 100644 index 0000000..328209d --- /dev/null +++ b/pkg/icingadb/overdue/get_overdues.lua @@ -0,0 +1,30 @@ +-- get_overdues.lua takes the following KEYS: +-- * either icinga:nextupdate:host or icinga:nextupdate:service +-- * either icingadb:overdue:host or icingadb:overdue:service +-- * a random one +-- +-- It takes the following ARGV: +-- * the current date and time as *nix timestamp float in seconds +-- +-- It returns the following: +-- * overdue monitored objects not yet marked overdue +-- * not overdue monitored objects not yet unmarked overdue + +local icingaNextupdate = KEYS[1] +local icingadbOverdue = KEYS[2] +local tempOverdue = KEYS[3] +local now = ARGV[1] + +redis.call('DEL', tempOverdue) + +local zrbs = redis.call('ZRANGEBYSCORE', icingaNextupdate, '-inf', '(' .. now) +for i = 1, #zrbs do + redis.call('SADD', tempOverdue, zrbs[i]) +end +zrbs = nil + +local res = { redis.call('SDIFF', tempOverdue, icingadbOverdue), redis.call('SDIFF', icingadbOverdue, tempOverdue) } + +redis.call('DEL', tempOverdue) + +return res diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go new file mode 100644 index 0000000..5cd4d67 --- /dev/null +++ b/pkg/icingadb/overdue/sync.go @@ -0,0 +1,252 @@ +package overdue + +import ( + "context" + _ "embed" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/google/uuid" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingadb/v1/overdue" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "regexp" + "strconv" + "strings" + "time" +) + +// Sync specifies the source and destination of an overdue sync. +type Sync struct { + db *icingadb.DB + redis *icingaredis.Client + logger *logging.Logger +} + +// NewSync creates a new Sync. +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { + return &Sync{ + db: db, + redis: redis, + logger: logger, + } +} + +// factory abstracts overdue.NewHostState and overdue.NewServiceState. +type factory = func(id string, overdue bool) (contracts.Entity, error) + +// Sync synchronizes Redis overdue sets from s.redis to s.db. +func (s Sync) Sync(ctx context.Context) error { + { + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return s.initSync(ctx, "host") + }) + + g.Go(func() error { + return s.initSync(ctx, "service") + }) + + if err := g.Wait(); err != nil { + return errors.Wrap(err, "can't sync overdue indicators") + } + } + + g, ctx := errgroup.WithContext(ctx) + + var hostCounter com.Counter + defer s.log(ctx, "host", &hostCounter).Stop() + + var serviceCounter com.Counter + defer s.log(ctx, "service", &serviceCounter).Stop() + + g.Go(func() error { + return s.sync(ctx, "host", overdue.NewHostState, &hostCounter) + }) + + g.Go(func() error { + return s.sync(ctx, "service", overdue.NewServiceState, &serviceCounter) + }) + + return g.Wait() +} + +// initSync initializes icingadb:overdue:objectType from the database. +func (s Sync) initSync(ctx context.Context, objectType string) error { + s.logger.Debugf("Refreshing already synced %s overdue indicators", objectType) + start := time.Now() + + var rows []v1.IdMeta + query := fmt.Sprintf("SELECT id FROM %s_state WHERE is_overdue='y'", objectType) + + if err := s.db.SelectContext(ctx, &rows, query); err != nil { + return internal.CantPerformQuery(err, query) + } + + _, err := s.redis.Pipelined(ctx, func(pipe redis.Pipeliner) error { + key := "icingadb:overdue:" + objectType + pipe.Del(ctx, key) + + var ids []interface{} + for _, row := range rows { + ids = append(ids, row.Id.String()) + if len(ids) == 100 { + pipe.SAdd(ctx, key, ids...) + ids = nil + } + } + + if len(ids) > 0 { + pipe.SAdd(ctx, key, ids...) + } + + return nil + }) + + if err == nil { + s.logger.Debugf( + "Refreshing %d already synced %s overdue indicators took %s", + len(rows), objectType, time.Since(start), + ) + } else { + err = errors.Wrap(err, "can't execute Redis pipeline") + } + + return err +} + +// log periodically logs sync's workload. +func (s Sync) log(ctx context.Context, objectType string, counter *com.Counter) periodic.Stopper { + return periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + s.logger.Infof("Synced %d %s overdue indicators", count, objectType) + } + }) +} + +//go:embed get_overdues.lua +var getOverduesLua string + +var luaGetOverdues = redis.NewScript(strings.TrimSpace( + regexp.MustCompile(`(?m)^--.*?$`).ReplaceAllString(getOverduesLua, ""), +)) + +// sync synchronizes Redis overdue sets from s.redis to s.db for objectType. +func (s Sync) sync(ctx context.Context, objectType string, factory factory, counter *com.Counter) error { + s.logger.Debugf("Syncing %s overdue indicators", objectType) + + keys := [3]string{"icinga:nextupdate:" + objectType, "icingadb:overdue:" + objectType, ""} + if rand, err := uuid.NewRandom(); err == nil { + keys[2] = rand.String() + } else { + return errors.Wrap(err, "can't create random UUID") + } + + const period = 2 * time.Second + periodically := time.NewTicker(period) + defer periodically.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-periodically.C: + overdues, err := luaGetOverdues.Run( + ctx, s.redis, keys[:], strconv.FormatInt(time.Now().Unix(), 10), + ).Result() + if err != nil { + return errors.Wrap(err, "can't execute Redis script") + } + + root := overdues.([]interface{}) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return s.updateOverdue(ctx, objectType, factory, counter, root[0].([]interface{}), true) + }) + + g.Go(func() error { + return s.updateOverdue(ctx, objectType, factory, counter, root[1].([]interface{}), false) + }) + + if err := g.Wait(); err != nil { + return errors.Wrap(err, "can't update overdue indicators") + } + + // For the case that syncing has taken some time, delay the next sync. + periodically.Reset(period) + + select { + case <-periodically.C: // Clean up periodically.C after reset... + default: // ... unless it's already clean. + } + } + } +} + +// updateOverdue sets objectType_state#is_overdue for ids to overdue +// and updates icingadb:overdue:objectType respectively. +func (s Sync) updateOverdue( + ctx context.Context, objectType string, factory factory, counter *com.Counter, ids []interface{}, overdue bool, +) error { + if len(ids) < 1 { + return nil + } + + if err := s.updateDb(ctx, factory, ids, overdue); err != nil { + return errors.Wrap(err, "can't update overdue indicators") + } + + counter.Add(uint64(len(ids))) + telemetry.Stats.Overdue.Add(uint64(len(ids))) + + var op func(ctx context.Context, key string, members ...interface{}) *redis.IntCmd + if overdue { + op = s.redis.SAdd + } else { + op = s.redis.SRem + } + + _, err := op(ctx, "icingadb:overdue:"+objectType, ids...).Result() + return err +} + +// updateDb sets objectType_state#is_overdue for ids to overdue. +func (s Sync) updateDb(ctx context.Context, factory factory, ids []interface{}, overdue bool) error { + g, ctx := errgroup.WithContext(ctx) + ch := make(chan contracts.Entity, 1<<10) + + g.Go(func() error { + defer close(ch) + + for _, id := range ids { + e, err := factory(id.(string), overdue) + if err != nil { + return errors.Wrap(err, "can't create entity") + } + + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- e: + } + } + + return nil + }) + + g.Go(func() error { + return s.db.UpdateStreamed(ctx, ch) + }) + + return g.Wait() +} diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go new file mode 100644 index 0000000..dfee9c0 --- /dev/null +++ b/pkg/icingadb/runtime_updates.go @@ -0,0 +1,368 @@ +package icingadb + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/structify" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" + "reflect" + "strconv" + "strings" + "sync" +) + +// RuntimeUpdates specifies the source and destination of runtime updates. +type RuntimeUpdates struct { + db *DB + redis *icingaredis.Client + logger *logging.Logger +} + +// NewRuntimeUpdates creates a new RuntimeUpdates. +func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger) *RuntimeUpdates { + return &RuntimeUpdates{ + db: db, + redis: redis, + logger: logger, + } +} + +// ClearStreams returns the stream key to ID mapping of the runtime update streams +// for later use in Sync and clears the streams themselves. +func (r *RuntimeUpdates) ClearStreams(ctx context.Context) (config, state icingaredis.Streams, err error) { + config = icingaredis.Streams{"icinga:runtime": "0-0"} + state = icingaredis.Streams{"icinga:runtime:state": "0-0"} + + var keys []string + for _, streams := range [...]icingaredis.Streams{config, state} { + for key := range streams { + keys = append(keys, key) + } + } + + err = icingaredis.WrapCmdErr(r.redis.Del(ctx, keys...)) + return +} + +// Sync synchronizes runtime update streams from s.redis to s.db and deletes the original data on success. +// Note that Sync must be only be called configuration synchronization has been completed. +// allowParallel allows synchronizing out of order (not FIFO). +func (r *RuntimeUpdates) Sync( + ctx context.Context, factoryFuncs []contracts.EntityFactoryFunc, streams icingaredis.Streams, allowParallel bool, +) error { + g, ctx := errgroup.WithContext(ctx) + + updateMessagesByKey := make(map[string]chan<- redis.XMessage) + + for _, factoryFunc := range factoryFuncs { + s := common.NewSyncSubject(factoryFunc) + stat := getCounterForEntity(s.Entity()) + + updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount) + upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount) + deleteIds := make(chan interface{}, r.redis.Options.XReadCount) + + var upsertedFifo chan contracts.Entity + var deletedFifo chan interface{} + var upsertCount int + var deleteCount int + upsertStmt, upsertPlaceholders := r.db.BuildUpsertStmt(s.Entity()) + if !allowParallel { + upsertedFifo = make(chan contracts.Entity, 1) + deletedFifo = make(chan interface{}, 1) + upsertCount = 1 + deleteCount = 1 + } else { + upsertCount = r.db.BatchSizeByPlaceholders(upsertPlaceholders) + deleteCount = r.db.Options.MaxPlaceholdersPerStatement + } + + updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(s.Name(), ':'))] = updateMessages + + r.logger.Debugf("Syncing runtime updates of %s", s.Name()) + + g.Go(structifyStream( + ctx, updateMessages, upsertEntities, upsertedFifo, deleteIds, deletedFifo, + structify.MakeMapStructifier(reflect.TypeOf(s.Entity()).Elem(), "json"), + )) + + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, s.Name()) + } + }).Stop() + + // Updates must be executed in order, ensure this by using a semaphore with maximum 1. + sem := semaphore.NewWeighted(1) + + onSuccess := []OnSuccess[contracts.Entity]{ + OnSuccessIncrement[contracts.Entity](&counter), OnSuccessIncrement[contracts.Entity](stat), + } + if !allowParallel { + onSuccess = append(onSuccess, OnSuccessSendTo(upsertedFifo)) + } + + return r.db.NamedBulkExec( + ctx, upsertStmt, upsertCount, sem, upsertEntities, com.SplitOnDupId[contracts.Entity], onSuccess..., + ) + }) + + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Deleted %d %s items", count, s.Name()) + } + }).Stop() + + sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity())) + + onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter), OnSuccessIncrement[any](stat)} + if !allowParallel { + onSuccess = append(onSuccess, OnSuccessSendTo(deletedFifo)) + } + + return r.db.BulkExec(ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, onSuccess...) + }) + } + + // customvar and customvar_flat sync. + { + updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount) + upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount) + deleteIds := make(chan interface{}, r.redis.Options.XReadCount) + + cv := common.NewSyncSubject(v1.NewCustomvar) + cvFlat := common.NewSyncSubject(v1.NewCustomvarFlat) + + r.logger.Debug("Syncing runtime updates of " + cv.Name()) + r.logger.Debug("Syncing runtime updates of " + cvFlat.Name()) + + updateMessagesByKey["icinga:"+utils.Key(cv.Name(), ':')] = updateMessages + g.Go(structifyStream( + ctx, updateMessages, upsertEntities, nil, deleteIds, nil, + structify.MakeMapStructifier(reflect.TypeOf(cv.Entity()).Elem(), "json"), + )) + + customvars, flatCustomvars, errs := v1.ExpandCustomvars(ctx, upsertEntities) + com.ErrgroupReceive(g, errs) + + cvStmt, cvPlaceholders := r.db.BuildUpsertStmt(cv.Entity()) + cvCount := r.db.BatchSizeByPlaceholders(cvPlaceholders) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, cv.Name()) + } + }).Stop() + + // Updates must be executed in order, ensure this by using a semaphore with maximum 1. + sem := semaphore.NewWeighted(1) + + return r.db.NamedBulkExec( + ctx, cvStmt, cvCount, sem, customvars, com.SplitOnDupId[contracts.Entity], + OnSuccessIncrement[contracts.Entity](&counter), + OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config), + ) + }) + + cvFlatStmt, cvFlatPlaceholders := r.db.BuildUpsertStmt(cvFlat.Entity()) + cvFlatCount := r.db.BatchSizeByPlaceholders(cvFlatPlaceholders) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, cvFlat.Name()) + } + }).Stop() + + // Updates must be executed in order, ensure this by using a semaphore with maximum 1. + sem := semaphore.NewWeighted(1) + + return r.db.NamedBulkExec( + ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, + com.SplitOnDupId[contracts.Entity], OnSuccessIncrement[contracts.Entity](&counter), + OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config), + ) + }) + + g.Go(func() error { + var once sync.Once + for { + select { + case _, ok := <-deleteIds: + if !ok { + return nil + } + // Icinga 2 does not send custom var delete events. + once.Do(func() { + r.logger.DPanic("received unexpected custom var delete event") + }) + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + g.Go(r.xRead(ctx, updateMessagesByKey, streams)) + + return g.Wait() +} + +// xRead reads from the runtime update streams and sends the data to the corresponding updateMessages channel. +// The updateMessages channel is determined by a "redis_key" on each redis message. +func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[string]chan<- redis.XMessage, streams icingaredis.Streams) func() error { + return func() error { + defer func() { + for _, updateMessages := range updateMessagesByKey { + close(updateMessages) + } + }() + + for { + rs, err := r.redis.XReadUntilResult(ctx, &redis.XReadArgs{ + Streams: streams.Option(), + Count: int64(r.redis.Options.XReadCount), + }) + if err != nil { + return errors.Wrap(err, "can't read runtime updates") + } + + pipe := r.redis.Pipeline() + for _, stream := range rs { + var id string + + for _, message := range stream.Messages { + id = message.ID + + redisKey := message.Values["redis_key"] + if redisKey == nil { + return errors.Errorf("stream message missing 'redis_key' key: %v", message.Values) + } + + updateMessages := updateMessagesByKey[redisKey.(string)] + if updateMessages == nil { + return errors.Errorf("no object type for redis key %s found", redisKey) + } + + select { + case updateMessages <- message: + case <-ctx.Done(): + return ctx.Err() + } + } + + tsAndSerial := strings.Split(id, "-") + if s, err := strconv.ParseUint(tsAndSerial[1], 10, 64); err == nil { + tsAndSerial[1] = strconv.FormatUint(s+1, 10) + } + + pipe.XTrimMinIDApprox(ctx, stream.Stream, strings.Join(tsAndSerial, "-"), 0) + streams[stream.Stream] = id + } + + if cmds, err := pipe.Exec(ctx); err != nil { + r.logger.Errorw("Can't execute Redis pipeline", zap.Error(errors.WithStack(err))) + } else { + for _, cmd := range cmds { + if cmd.Err() != nil { + r.logger.Errorw("Can't trim runtime updates stream", zap.Error(icingaredis.WrapCmdErr(cmd))) + } + } + } + } + } +} + +// structifyStream gets Redis stream messages (redis.XMessage) via the updateMessages channel and converts +// those messages into Icinga DB entities (contracts.Entity) using the provided structifier. +// Converted entities are inserted into the upsertEntities or deleteIds channel depending on the "runtime_type" message field. +func structifyStream( + ctx context.Context, updateMessages <-chan redis.XMessage, upsertEntities, upserted chan contracts.Entity, + deleteIds, deleted chan interface{}, structifier structify.MapStructifier, +) func() error { + if upserted == nil { + upserted = make(chan contracts.Entity) + close(upserted) + } + + if deleted == nil { + deleted = make(chan interface{}) + close(deleted) + } + + return func() error { + defer func() { + close(upsertEntities) + close(deleteIds) + }() + + for { + select { + case message, ok := <-updateMessages: + if !ok { + return nil + } + + ptr, err := structifier(message.Values) + if err != nil { + return errors.Wrapf(err, "can't structify values %#v", message.Values) + } + + entity := ptr.(contracts.Entity) + + runtimeType := message.Values["runtime_type"] + if runtimeType == nil { + return errors.Errorf("stream message missing 'runtime_type' key: %v", message.Values) + } + + if runtimeType == "upsert" { + select { + case upsertEntities <- entity: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case <-upserted: + case <-ctx.Done(): + return ctx.Err() + } + } else if runtimeType == "delete" { + select { + case deleteIds <- entity.ID(): + case <-ctx.Done(): + return ctx.Err() + } + + select { + case <-deleted: + case <-ctx.Done(): + return ctx.Err() + } + } else { + return errors.Errorf("invalid runtime type: %s", runtimeType) + } + case <-ctx.Done(): + return ctx.Err() + } + } + } +} diff --git a/pkg/icingadb/scoped_entity.go b/pkg/icingadb/scoped_entity.go new file mode 100644 index 0000000..7c1688c --- /dev/null +++ b/pkg/icingadb/scoped_entity.go @@ -0,0 +1,32 @@ +package icingadb + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/utils" +) + +// ScopedEntity combines an entity and a scope that specifies +// the WHERE conditions that entities of the +// enclosed entity type must satisfy in order to be SELECTed. +type ScopedEntity struct { + contracts.Entity + scope interface{} +} + +// Scope implements the contracts.Scoper interface. +func (e ScopedEntity) Scope() interface{} { + return e.scope +} + +// TableName implements the contracts.TableNamer interface. +func (e ScopedEntity) TableName() string { + return utils.TableName(e.Entity) +} + +// NewScopedEntity returns a new ScopedEntity. +func NewScopedEntity(entity contracts.Entity, scope interface{}) *ScopedEntity { + return &ScopedEntity{ + Entity: entity, + scope: scope, + } +} diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go new file mode 100644 index 0000000..790f11e --- /dev/null +++ b/pkg/icingadb/sync.go @@ -0,0 +1,221 @@ +package icingadb + +import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "runtime" + "time" +) + +// Sync implements a rendezvous point for Icinga DB and Redis to synchronize their entities. +type Sync struct { + db *DB + redis *icingaredis.Client + logger *logging.Logger +} + +// NewSync returns a new Sync. +func NewSync(db *DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { + return &Sync{ + db: db, + redis: redis, + logger: logger, + } +} + +// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the given +// sync subject using the Sync function. +func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, dump *DumpSignals) error { + typeName := utils.Name(subject.Entity()) + key := "icinga:" + utils.Key(typeName, ':') + + startTime := time.Now() + logTicker := time.NewTicker(s.logger.Interval()) + defer logTicker.Stop() + loggedWaiting := false + + for { + select { + case <-logTicker.C: + s.logger.Infow("Waiting for dump done signal", + zap.String("type", typeName), + zap.String("key", key), + zap.Duration("duration", time.Since(startTime))) + loggedWaiting = true + case <-dump.Done(key): + logFn := s.logger.Debugw + if loggedWaiting { + logFn = s.logger.Infow + } + logFn("Starting sync", + zap.String("type", typeName), + zap.String("key", key), + zap.Duration("waited", time.Since(startTime))) + return s.Sync(ctx, subject) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject. +// This function does not respect dump signals. For this, use SyncAfterDump. +func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { + g, ctx := errgroup.WithContext(ctx) + + desired, redisErrs := s.redis.YieldAll(ctx, subject) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, redisErrs) + + e, ok := v1.EnvironmentFromContext(ctx) + if !ok { + return errors.New("can't get environment from context") + } + + actual, dbErrs := s.db.YieldAll( + ctx, subject.FactoryForDelta(), + s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()), e.Meta(), + ) + // Let errors from DB cancel our group. + com.ErrgroupReceive(g, dbErrs) + + g.Go(func() error { + return s.ApplyDelta(ctx, NewDelta(ctx, actual, desired, subject, s.logger)) + }) + + return g.Wait() +} + +// ApplyDelta applies all changes from Delta to the database. +func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { + if err := delta.Wait(); err != nil { + return errors.Wrap(err, "can't calculate delta") + } + + g, ctx := errgroup.WithContext(ctx) + stat := getCounterForEntity(delta.Subject.Entity()) + + // Create + if len(delta.Create) > 0 { + s.logger.Infof("Inserting %d items of type %s", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + var entities <-chan contracts.Entity + if delta.Subject.WithChecksum() { + pairs, errs := s.redis.HMYield( + ctx, + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), + delta.Create.Keys()...) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, errs) + + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel our group. + com.ErrgroupReceive(g, errs) + entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU()) + // Let errors from SetChecksums cancel our group. + com.ErrgroupReceive(g, errs) + } else { + entities = delta.Create.Entities(ctx) + } + + g.Go(func() error { + return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) + }) + } + + // Update + if len(delta.Update) > 0 { + s.logger.Infof("Updating %d items of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + pairs, errs := s.redis.HMYield( + ctx, + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), + delta.Update.Keys()...) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, errs) + + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel our group. + com.ErrgroupReceive(g, errs) + entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU()) + // Let errors from SetChecksums cancel our group. + com.ErrgroupReceive(g, errs) + + g.Go(func() error { + // Using upsert here on purpose as this is the fastest way to do bulk updates. + // However, there is a risk that errors in the sync implementation could silently insert new rows. + return s.db.UpsertStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) + }) + } + + // Delete + if len(delta.Delete) > 0 { + s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + g.Go(func() error { + return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat)) + }) + } + + return g.Wait() +} + +// SyncCustomvars synchronizes customvar and customvar_flat. +func (s Sync) SyncCustomvars(ctx context.Context) error { + e, ok := v1.EnvironmentFromContext(ctx) + if !ok { + return errors.New("can't get environment from context") + } + + g, ctx := errgroup.WithContext(ctx) + + cv := common.NewSyncSubject(v1.NewCustomvar) + + cvs, errs := s.redis.YieldAll(ctx, cv) + com.ErrgroupReceive(g, errs) + + desiredCvs, desiredFlatCvs, errs := v1.ExpandCustomvars(ctx, cvs) + com.ErrgroupReceive(g, errs) + + actualCvs, errs := s.db.YieldAll( + ctx, cv.FactoryForDelta(), + s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()), e.Meta(), + ) + com.ErrgroupReceive(g, errs) + + g.Go(func() error { + return s.ApplyDelta(ctx, NewDelta(ctx, actualCvs, desiredCvs, cv, s.logger)) + }) + + flatCv := common.NewSyncSubject(v1.NewCustomvarFlat) + + actualFlatCvs, errs := s.db.YieldAll( + ctx, flatCv.FactoryForDelta(), + s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()), e.Meta(), + ) + com.ErrgroupReceive(g, errs) + + g.Go(func() error { + return s.ApplyDelta(ctx, NewDelta(ctx, actualFlatCvs, desiredFlatCvs, flatCv, s.logger)) + }) + + return g.Wait() +} + +// getCounterForEntity returns the appropriate counter (config/state) from telemetry.Stats for e. +func getCounterForEntity(e contracts.Entity) *com.Counter { + switch e.(type) { + case *v1.HostState, *v1.ServiceState: + return &telemetry.Stats.State + default: + return &telemetry.Stats.Config + } +} diff --git a/pkg/icingadb/v1/checkable.go b/pkg/icingadb/v1/checkable.go new file mode 100644 index 0000000..dbb114c --- /dev/null +++ b/pkg/icingadb/v1/checkable.go @@ -0,0 +1,46 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type Checkable struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameCiMeta `json:",inline"` + ActionUrlId types.Binary `json:"action_url_id"` + ActiveChecksEnabled types.Bool `json:"active_checks_enabled"` + CheckInterval float64 `json:"check_interval"` + CheckTimeperiodName string `json:"check_timeperiod_name"` + CheckTimeperiodId types.Binary `json:"check_timeperiod_id"` + CheckRetryInterval float64 `json:"check_retry_interval"` + CheckTimeout float64 `json:"check_timeout"` + CheckcommandName string `json:"checkcommand_name"` + CheckcommandId types.Binary `json:"checkcommand_id"` + CommandEndpointName string `json:"command_endpoint_name"` + CommandEndpointId types.Binary `json:"command_endpoint_id"` + DisplayName string `json:"display_name"` + EventHandlerEnabled types.Bool `json:"event_handler_enabled"` + EventcommandName string `json:"eventcommand_name"` + EventcommandId types.Binary `json:"eventcommand_id"` + FlappingEnabled types.Bool `json:"flapping_enabled"` + FlappingThresholdHigh float64 `json:"flapping_threshold_high"` + FlappingThresholdLow float64 `json:"flapping_threshold_low"` + IconImageAlt string `json:"icon_image_alt"` + IconImageId types.Binary `json:"icon_image_id"` + IsVolatile types.Bool `json:"is_volatile"` + MaxCheckAttempts float64 `json:"max_check_attempts"` + Notes string `json:"notes"` + NotesUrlId types.Binary `json:"notes_url_id"` + NotificationsEnabled types.Bool `json:"notifications_enabled"` + PassiveChecksEnabled types.Bool `json:"passive_checks_enabled"` + PerfdataEnabled types.Bool `json:"perfdata_enabled"` + ZoneName string `json:"zone_name"` + ZoneId types.Binary `json:"zone_id"` +} + +// Assert interface compliance. +var ( + _ contracts.Initer = (*Checkable)(nil) +) diff --git a/pkg/icingadb/v1/command.go b/pkg/icingadb/v1/command.go new file mode 100644 index 0000000..cd4db2a --- /dev/null +++ b/pkg/icingadb/v1/command.go @@ -0,0 +1,169 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type Command struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameCiMeta `json:",inline"` + ZoneId types.Binary `json:"zone_id"` + Command string `json:"command"` + Timeout uint32 `json:"timeout"` +} + +type CommandArgument struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + ArgumentKey string `json:"argument_key"` + ArgumentValue types.String `json:"value"` + ArgumentOrder types.Int `json:"order"` + Description types.String `json:"description"` + ArgumentKeyOverride types.String `json:"key"` + RepeatKey types.Bool `json:"repeat_key"` + Required types.Bool `json:"required"` + SetIf types.String `json:"set_if"` + Separator types.String `json:"separator"` + SkipKey types.Bool `json:"skip_key"` +} + +// Init implements the contracts.Initer interface. +func (ca *CommandArgument) Init() { + ca.RepeatKey = types.Bool{ + Bool: true, + Valid: true, + } + + ca.Required = types.Bool{ + Bool: false, + Valid: true, + } + + ca.SkipKey = types.Bool{ + Bool: false, + Valid: true, + } +} + +type CommandEnvvar struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + EnvvarKey string `json:"envvar_key"` + EnvvarValue string `json:"value"` +} + +type Checkcommand struct { + Command `json:",inline"` +} + +type CheckcommandArgument struct { + CommandArgument `json:",inline"` + CheckcommandId types.Binary `json:"checkcommand_id"` +} + +type CheckcommandEnvvar struct { + CommandEnvvar `json:",inline"` + CheckcommandId types.Binary `json:"checkcommand_id"` +} + +type CheckcommandCustomvar struct { + CustomvarMeta `json:",inline"` + CheckcommandId types.Binary `json:"checkcommand_id"` +} + +type Eventcommand struct { + Command `json:",inline"` +} + +type EventcommandArgument struct { + CommandArgument `json:",inline"` + EventcommandId types.Binary `json:"eventcommand_id"` +} + +type EventcommandEnvvar struct { + CommandEnvvar `json:",inline"` + EventcommandId types.Binary `json:"eventcommand_id"` +} + +type EventcommandCustomvar struct { + CustomvarMeta `json:",inline"` + EventcommandId types.Binary `json:"eventcommand_id"` +} + +type Notificationcommand struct { + Command `json:",inline"` +} + +type NotificationcommandArgument struct { + CommandArgument `json:",inline"` + NotificationcommandId types.Binary `json:"notificationcommand_id"` +} + +type NotificationcommandEnvvar struct { + CommandEnvvar `json:",inline"` + NotificationcommandId types.Binary `json:"notificationcommand_id"` +} + +type NotificationcommandCustomvar struct { + CustomvarMeta `json:",inline"` + NotificationcommandId types.Binary `json:"notificationcommand_id"` +} + +func NewCheckcommand() contracts.Entity { + return &Checkcommand{} +} + +func NewCheckcommandArgument() contracts.Entity { + return &CheckcommandArgument{} +} + +func NewCheckcommandEnvvar() contracts.Entity { + return &CheckcommandEnvvar{} +} + +func NewCheckcommandCustomvar() contracts.Entity { + return &CheckcommandCustomvar{} +} + +func NewEventcommand() contracts.Entity { + return &Eventcommand{} +} + +func NewEventcommandArgument() contracts.Entity { + return &EventcommandArgument{} +} + +func NewEventcommandEnvvar() contracts.Entity { + return &EventcommandEnvvar{} +} + +func NewEventcommandCustomvar() contracts.Entity { + return &EventcommandCustomvar{} +} + +func NewNotificationcommand() contracts.Entity { + return &Notificationcommand{} +} + +func NewNotificationcommandArgument() contracts.Entity { + return &NotificationcommandArgument{} +} + +func NewNotificationcommandEnvvar() contracts.Entity { + return &NotificationcommandEnvvar{} +} + +func NewNotificationcommandCustomvar() contracts.Entity { + return &NotificationcommandCustomvar{} +} + +// Assert interface compliance. +var ( + _ contracts.Initer = (*Command)(nil) + _ contracts.Initer = (*CommandArgument)(nil) + _ contracts.Initer = (*Checkcommand)(nil) + _ contracts.Initer = (*Eventcommand)(nil) + _ contracts.Initer = (*Notificationcommand)(nil) +) diff --git a/pkg/icingadb/v1/comment.go b/pkg/icingadb/v1/comment.go new file mode 100644 index 0000000..b720fac --- /dev/null +++ b/pkg/icingadb/v1/comment.go @@ -0,0 +1,27 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type Comment struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameMeta `json:",inline"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` + Author string `json:"author"` + Text string `json:"text"` + EntryType types.CommentType `json:"entry_type"` + EntryTime types.UnixMilli `json:"entry_time"` + IsPersistent types.Bool `json:"is_persistent"` + IsSticky types.Bool `json:"is_sticky"` + ExpireTime types.UnixMilli `json:"expire_time"` + ZoneId types.Binary `json:"zone_id"` +} + +func NewComment() contracts.Entity { + return &Comment{} +} diff --git a/pkg/icingadb/v1/customvar.go b/pkg/icingadb/v1/customvar.go new file mode 100644 index 0000000..462b87c --- /dev/null +++ b/pkg/icingadb/v1/customvar.go @@ -0,0 +1,157 @@ +package v1 + +import ( + "context" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/flatten" + "github.com/icinga/icingadb/pkg/icingadb/objectpacker" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "golang.org/x/sync/errgroup" + "runtime" +) + +type Customvar struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameMeta `json:",inline"` + Value string `json:"value"` +} + +type CustomvarFlat struct { + CustomvarMeta `json:",inline"` + Flatname string `json:"flatname"` + FlatnameChecksum types.Binary `json:"flatname_checksum"` + Flatvalue types.String `json:"flatvalue"` +} + +func NewCustomvar() contracts.Entity { + return &Customvar{} +} + +func NewCustomvarFlat() contracts.Entity { + return &CustomvarFlat{} +} + +// ExpandCustomvars streams custom variables from a provided channel and returns three channels, +// the first providing the unmodified custom variable read from the input channel, +// the second channel providing the corresponding resolved flat custom variables, +// and the third channel providing an error, if any. +func ExpandCustomvars( + ctx context.Context, + cvs <-chan contracts.Entity, +) (customvars, flatCustomvars <-chan contracts.Entity, errs <-chan error) { + g, ctx := errgroup.WithContext(ctx) + + // Multiplex cvs to use them both for customvar and customvar_flat. + var forward chan contracts.Entity + customvars, forward = multiplexCvs(ctx, g, cvs) + flatCustomvars = flattenCustomvars(ctx, g, forward) + errs = com.WaitAsync(g) + + return +} + +// multiplexCvs streams custom variables from a provided channel and +// forwards each custom variable to the two returned output channels. +func multiplexCvs( + ctx context.Context, + g *errgroup.Group, + cvs <-chan contracts.Entity, +) (customvars1, customvars2 chan contracts.Entity) { + customvars1 = make(chan contracts.Entity) + customvars2 = make(chan contracts.Entity) + + g.Go(func() error { + defer close(customvars1) + defer close(customvars2) + + for { + select { + case cv, ok := <-cvs: + if !ok { + return nil + } + + select { + case customvars1 <- cv: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case customvars2 <- cv: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return +} + +// flattenCustomvars creates and yields flat custom variables from the provided custom variables. +func flattenCustomvars(ctx context.Context, g *errgroup.Group, cvs <-chan contracts.Entity) (flatCustomvars chan contracts.Entity) { + flatCustomvars = make(chan contracts.Entity) + + g.Go(func() error { + defer close(flatCustomvars) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < runtime.NumCPU(); i++ { + g.Go(func() error { + for entity := range cvs { + var value interface{} + customvar := entity.(*Customvar) + if err := internal.UnmarshalJSON([]byte(customvar.Value), &value); err != nil { + return err + } + + flattened := flatten.Flatten(value, customvar.Name) + + for flatname, flatvalue := range flattened { + var fv interface{} + if flatvalue.Valid { + fv = flatvalue.String + } + + select { + case flatCustomvars <- &CustomvarFlat{ + CustomvarMeta: CustomvarMeta{ + EntityWithoutChecksum: EntityWithoutChecksum{ + IdMeta: IdMeta{ + // TODO(el): Schema comment is wrong. + // Without customvar.Id we would produce duplicate keys here. + Id: utils.Checksum(objectpacker.MustPackSlice(customvar.EnvironmentId, customvar.Id, flatname, fv)), + }, + }, + EnvironmentMeta: EnvironmentMeta{ + EnvironmentId: customvar.EnvironmentId, + }, + CustomvarId: customvar.Id, + }, + Flatname: flatname, + FlatnameChecksum: utils.Checksum(flatname), + Flatvalue: flatvalue, + }: + case <-ctx.Done(): + return ctx.Err() + } + } + } + + return nil + }) + } + + return g.Wait() + }) + + return +} diff --git a/pkg/icingadb/v1/downtime.go b/pkg/icingadb/v1/downtime.go new file mode 100644 index 0000000..0878e5e --- /dev/null +++ b/pkg/icingadb/v1/downtime.go @@ -0,0 +1,35 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type Downtime struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameMeta `json:",inline"` + TriggeredById types.Binary `json:"triggered_by_id"` + ParentId types.Binary `json:"parent_id"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` + Author string `json:"author"` + Comment string `json:"comment"` + EntryTime types.UnixMilli `json:"entry_time"` + ScheduledStartTime types.UnixMilli `json:"scheduled_start_time"` + ScheduledEndTime types.UnixMilli `json:"scheduled_end_time"` + ScheduledDuration uint64 `json:"scheduled_duration"` + IsFlexible types.Bool `json:"is_flexible"` + FlexibleDuration uint64 `json:"flexible_duration"` + IsInEffect types.Bool `json:"is_in_effect"` + StartTime types.UnixMilli `json:"start_time"` + EndTime types.UnixMilli `json:"end_time"` + Duration uint64 `json:"duration"` + ScheduledBy types.String `json:"scheduled_by"` + ZoneId types.Binary `json:"zone_id"` +} + +func NewDowntime() contracts.Entity { + return &Downtime{} +} diff --git a/pkg/icingadb/v1/endpoint.go b/pkg/icingadb/v1/endpoint.go new file mode 100644 index 0000000..6abe9d7 --- /dev/null +++ b/pkg/icingadb/v1/endpoint.go @@ -0,0 +1,36 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type Endpoint struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameCiMeta `json:",inline"` + ZoneId types.Binary `json:"zone_id"` +} + +type Zone struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameCiMeta `json:",inline"` + IsGlobal types.Bool `json:"is_global"` + ParentId types.Binary `json:"parent_id"` + Depth uint8 `json:"depth"` +} + +func NewEndpoint() contracts.Entity { + return &Endpoint{} +} + +func NewZone() contracts.Entity { + return &Zone{} +} + +// Assert interface compliance. +var ( + _ contracts.Initer = (*Endpoint)(nil) + _ contracts.Initer = (*Zone)(nil) +) diff --git a/pkg/icingadb/v1/entity.go b/pkg/icingadb/v1/entity.go new file mode 100644 index 0000000..5dfa3d2 --- /dev/null +++ b/pkg/icingadb/v1/entity.go @@ -0,0 +1,28 @@ +package v1 + +import "github.com/icinga/icingadb/pkg/contracts" + +// EntityWithoutChecksum represents entities without a checksum. +type EntityWithoutChecksum struct { + IdMeta `json:",inline"` +} + +// Fingerprint implements the contracts.Fingerprinter interface. +func (e EntityWithoutChecksum) Fingerprint() contracts.Fingerprinter { + return e +} + +// EntityWithChecksum represents entities with a checksum. +type EntityWithChecksum struct { + EntityWithoutChecksum `json:",inline"` + ChecksumMeta `json:",inline"` +} + +// Fingerprint implements the contracts.Fingerprinter interface. +func (e EntityWithChecksum) Fingerprint() contracts.Fingerprinter { + return e +} + +func NewEntityWithChecksum() contracts.Entity { + return &EntityWithChecksum{} +} diff --git a/pkg/icingadb/v1/environment.go b/pkg/icingadb/v1/environment.go new file mode 100644 index 0000000..80e06f6 --- /dev/null +++ b/pkg/icingadb/v1/environment.go @@ -0,0 +1,40 @@ +package v1 + +import ( + "context" + "github.com/icinga/icingadb/pkg/types" +) + +type Environment struct { + EntityWithoutChecksum `json:",inline"` + Name types.String `json:"name"` +} + +// NewContext returns a new Context that carries this Environment as value. +func (e *Environment) NewContext(parent context.Context) context.Context { + return context.WithValue(parent, environmentContextKey, e) +} + +// Meta returns the EnvironmentMeta for this Environment. +func (e *Environment) Meta() *EnvironmentMeta { + return &EnvironmentMeta{EnvironmentId: e.Id} +} + +// EnvironmentFromContext returns the Environment value stored in ctx, if any: +// +// e, ok := EnvironmentFromContext(ctx) +// if !ok { +// // Error handling. +// } +func EnvironmentFromContext(ctx context.Context) (*Environment, bool) { + if e, ok := ctx.Value(environmentContextKey).(*Environment); ok { + return e, true + } + + return nil, false +} + +// environmentContextKey is the key for Environment values in contexts. +// It's not exported, so callers use Environment.NewContext and EnvironmentFromContext +// instead of using that key directly. +var environmentContextKey contextKey diff --git a/pkg/icingadb/v1/history/ack.go b/pkg/icingadb/v1/history/ack.go new file mode 100644 index 0000000..094a7e4 --- /dev/null +++ b/pkg/icingadb/v1/history/ack.go @@ -0,0 +1,82 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +type AckHistoryUpserter struct { + ClearTime types.UnixMilli `json:"clear_time"` + ClearedBy types.String `json:"cleared_by"` +} + +// Upsert implements the contracts.Upserter interface. +func (ahu *AckHistoryUpserter) Upsert() interface{} { + return ahu +} + +type AcknowledgementHistory struct { + v1.EntityWithoutChecksum `json:",inline"` + HistoryTableMeta `json:",inline"` + AckHistoryUpserter `json:",inline"` + SetTime types.UnixMilli `json:"set_time"` + Author types.String `json:"author"` + Comment types.String `json:"comment"` + ExpireTime types.UnixMilli `json:"expire_time"` + IsPersistent types.Bool `json:"is_persistent"` + IsSticky types.Bool `json:"is_sticky"` +} + +type HistoryAck struct { + HistoryMeta `json:",inline"` + AcknowledgementHistoryId types.Binary `json:"id"` + + // Idea: read SetTime and ClearTime from Redis and let EventTime decide which of them to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + SetTime types.UnixMilli `json:"set_time" db:"-"` + ClearTime types.UnixMilli `json:"clear_time" db:"-"` + EventTime AckEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryAck) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryAck) TableName() string { + return "history" +} + +type AckEventTime struct { + History *HistoryAck `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et AckEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "ack_set": + return et.History.SetTime.Value() + case "ack_clear": + return et.History.ClearTime.Value() + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*AcknowledgementHistory)(nil) + _ contracts.Initer = (*HistoryAck)(nil) + _ contracts.TableNamer = (*HistoryAck)(nil) + _ UpserterEntity = (*HistoryAck)(nil) + _ driver.Valuer = AckEventTime{} +) diff --git a/pkg/icingadb/v1/history/comment.go b/pkg/icingadb/v1/history/comment.go new file mode 100644 index 0000000..d3a5743 --- /dev/null +++ b/pkg/icingadb/v1/history/comment.go @@ -0,0 +1,120 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type CommentHistoryEntity struct { + CommentId types.Binary `json:"comment_id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (che CommentHistoryEntity) Fingerprint() contracts.Fingerprinter { + return che +} + +// ID implements part of the contracts.Entity interface. +func (che CommentHistoryEntity) ID() contracts.ID { + return che.CommentId +} + +// SetID implements part of the contracts.Entity interface. +func (che *CommentHistoryEntity) SetID(id contracts.ID) { + che.CommentId = id.(types.Binary) +} + +type CommentHistoryUpserter struct { + RemovedBy types.String `json:"removed_by"` + RemoveTime types.UnixMilli `json:"remove_time"` + HasBeenRemoved types.Bool `json:"has_been_removed"` +} + +// Upsert implements the contracts.Upserter interface. +func (chu *CommentHistoryUpserter) Upsert() interface{} { + return chu +} + +type CommentHistory struct { + CommentHistoryEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + CommentHistoryUpserter `json:",inline"` + EntryTime types.UnixMilli `json:"entry_time"` + Author string `json:"author"` + Comment string `json:"comment"` + EntryType types.CommentType `json:"entry_type"` + IsPersistent types.Bool `json:"is_persistent"` + IsSticky types.Bool `json:"is_sticky"` + ExpireTime types.UnixMilli `json:"expire_time"` +} + +// Init implements the contracts.Initer interface. +func (ch *CommentHistory) Init() { + ch.HasBeenRemoved = types.Bool{ + Bool: false, + Valid: true, + } +} + +type HistoryComment struct { + HistoryMeta `json:",inline"` + CommentHistoryId types.Binary `json:"comment_id"` + + // Idea: read EntryTime, RemoveTime and ExpireTime from Redis + // and let EventTime decide which of them to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + EntryTime types.UnixMilli `json:"entry_time" db:"-"` + RemoveTime types.UnixMilli `json:"remove_time" db:"-"` + ExpireTime types.UnixMilli `json:"expire_time" db:"-"` + EventTime CommentEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryComment) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryComment) TableName() string { + return "history" +} + +type CommentEventTime struct { + History *HistoryComment `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et CommentEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "comment_add": + return et.History.EntryTime.Value() + case "comment_remove": + v, err := et.History.RemoveTime.Value() + if err == nil && v == nil { + return et.History.ExpireTime.Value() + } + + return v, err + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*CommentHistoryEntity)(nil) + _ contracts.Upserter = (*CommentHistoryUpserter)(nil) + _ contracts.Initer = (*CommentHistory)(nil) + _ UpserterEntity = (*CommentHistory)(nil) + _ contracts.Initer = (*HistoryComment)(nil) + _ contracts.TableNamer = (*HistoryComment)(nil) + _ UpserterEntity = (*HistoryComment)(nil) + _ driver.Valuer = CommentEventTime{} +) diff --git a/pkg/icingadb/v1/history/downtime.go b/pkg/icingadb/v1/history/downtime.go new file mode 100644 index 0000000..99f93f6 --- /dev/null +++ b/pkg/icingadb/v1/history/downtime.go @@ -0,0 +1,161 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type DowntimeHistoryEntity struct { + DowntimeId types.Binary `json:"downtime_id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (dhe DowntimeHistoryEntity) Fingerprint() contracts.Fingerprinter { + return dhe +} + +// ID implements part of the contracts.Entity interface. +func (dhe DowntimeHistoryEntity) ID() contracts.ID { + return dhe.DowntimeId +} + +// SetID implements part of the contracts.Entity interface. +func (dhe *DowntimeHistoryEntity) SetID(id contracts.ID) { + dhe.DowntimeId = id.(types.Binary) +} + +type DowntimeHistoryUpserter struct { + CancelledBy types.String `json:"cancelled_by"` + HasBeenCancelled types.Bool `json:"has_been_cancelled"` + CancelTime types.UnixMilli `json:"cancel_time"` +} + +// Upsert implements the contracts.Upserter interface. +func (dhu *DowntimeHistoryUpserter) Upsert() interface{} { + return dhu +} + +type DowntimeHistory struct { + DowntimeHistoryEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + DowntimeHistoryUpserter `json:",inline"` + TriggeredById types.Binary `json:"triggered_by_id"` + ParentId types.Binary `json:"parent_id"` + EntryTime types.UnixMilli `json:"entry_time"` + Author string `json:"author"` + Comment string `json:"comment"` + IsFlexible types.Bool `json:"is_flexible"` + FlexibleDuration uint64 `json:"flexible_duration"` + ScheduledStartTime types.UnixMilli `json:"scheduled_start_time"` + ScheduledEndTime types.UnixMilli `json:"scheduled_end_time"` + StartTime types.UnixMilli `json:"start_time"` + EndTime types.UnixMilli `json:"end_time"` + ScheduledBy types.String `json:"scheduled_by"` + TriggerTime types.UnixMilli `json:"trigger_time"` +} + +type HistoryDowntime struct { + HistoryMeta `json:",inline"` + DowntimeHistoryId types.Binary `json:"downtime_id"` + + // Idea: read StartTime, CancelTime, EndTime and HasBeenCancelled from Redis + // and let EventTime decide based on HasBeenCancelled which of the others to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + StartTime types.UnixMilli `json:"start_time" db:"-"` + CancelTime types.UnixMilli `json:"cancel_time" db:"-"` + EndTime types.UnixMilli `json:"end_time" db:"-"` + HasBeenCancelled types.Bool `json:"has_been_cancelled" db:"-"` + EventTime DowntimeEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryDowntime) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryDowntime) TableName() string { + return "history" +} + +type SlaHistoryDowntime struct { + DowntimeHistoryEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + SlaHistoryDowntimeUpserter `json:",inline"` + DowntimeStart types.UnixMilli `json:"start_time"` + HasBeenCancelled types.Bool `json:"has_been_cancelled" db:"-"` + CancelTime types.UnixMilli `json:"cancel_time" db:"-"` + EndTime types.UnixMilli `json:"end_time" db:"-"` +} + +// Init implements the contracts.Initer interface. +func (s *SlaHistoryDowntime) Init() { + s.DowntimeEnd.History = s +} + +type SlaHistoryDowntimeUpserter struct { + DowntimeEnd SlaDowntimeEndTime `json:"-"` +} + +// Upsert implements the contracts.Upserter interface. +func (h *SlaHistoryDowntimeUpserter) Upsert() interface{} { + return h +} + +type DowntimeEventTime struct { + History *HistoryDowntime `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et DowntimeEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "downtime_start": + return et.History.StartTime.Value() + case "downtime_end": + if !et.History.HasBeenCancelled.Valid { + return nil, nil + } + + if et.History.HasBeenCancelled.Bool { + return et.History.CancelTime.Value() + } else { + return et.History.EndTime.Value() + } + default: + return nil, nil + } +} + +type SlaDowntimeEndTime struct { + History *SlaHistoryDowntime `db:"-"` +} + +// Value implements the driver.Valuer interface. +func (et SlaDowntimeEndTime) Value() (driver.Value, error) { + if et.History.HasBeenCancelled.Valid && et.History.HasBeenCancelled.Bool { + return et.History.CancelTime.Value() + } else { + return et.History.EndTime.Value() + } +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*DowntimeHistoryEntity)(nil) + _ contracts.Upserter = (*DowntimeHistoryUpserter)(nil) + _ UpserterEntity = (*DowntimeHistory)(nil) + _ contracts.Initer = (*HistoryDowntime)(nil) + _ contracts.TableNamer = (*HistoryDowntime)(nil) + _ UpserterEntity = (*HistoryDowntime)(nil) + _ contracts.Initer = (*SlaHistoryDowntime)(nil) + _ UpserterEntity = (*SlaHistoryDowntime)(nil) + _ driver.Valuer = DowntimeEventTime{} + _ driver.Valuer = SlaDowntimeEndTime{} +) diff --git a/pkg/icingadb/v1/history/flapping.go b/pkg/icingadb/v1/history/flapping.go new file mode 100644 index 0000000..9280b27 --- /dev/null +++ b/pkg/icingadb/v1/history/flapping.go @@ -0,0 +1,80 @@ +package history + +import ( + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +type FlappingHistoryUpserter struct { + EndTime types.UnixMilli `json:"end_time"` + PercentStateChangeEnd types.Float `json:"percent_state_change_end"` + FlappingThresholdLow float32 `json:"flapping_threshold_low"` + FlappingThresholdHigh float32 `json:"flapping_threshold_high"` +} + +// Upsert implements the contracts.Upserter interface. +func (fhu *FlappingHistoryUpserter) Upsert() interface{} { + return fhu +} + +type FlappingHistory struct { + v1.EntityWithoutChecksum `json:",inline"` + HistoryTableMeta `json:",inline"` + FlappingHistoryUpserter `json:",inline"` + StartTime types.UnixMilli `json:"start_time"` + PercentStateChangeStart types.Float `json:"percent_state_change_start"` +} + +type HistoryFlapping struct { + HistoryMeta `json:",inline"` + FlappingHistoryId types.Binary `json:"id"` + + // Idea: read StartTime and EndTime from Redis and let EventTime decide which of them to write to MySQL. + // So EventTime doesn't have to be read from Redis (json:"-") + // and the others don't have to be written to MySQL (db:"-"). + StartTime types.UnixMilli `json:"start_time" db:"-"` + EndTime types.UnixMilli `json:"end_time" db:"-"` + EventTime FlappingEventTime `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *HistoryFlapping) Init() { + h.EventTime.History = h +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryFlapping) TableName() string { + return "history" +} + +type FlappingEventTime struct { + History *HistoryFlapping `db:"-"` +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (et FlappingEventTime) Value() (driver.Value, error) { + if et.History == nil { + return nil, nil + } + + switch et.History.EventType { + case "flapping_start": + return et.History.StartTime.Value() + case "flapping_end": + return et.History.EndTime.Value() + default: + return nil, nil + } +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*FlappingHistory)(nil) + _ contracts.Initer = (*HistoryFlapping)(nil) + _ contracts.TableNamer = (*HistoryFlapping)(nil) + _ UpserterEntity = (*HistoryFlapping)(nil) + _ driver.Valuer = FlappingEventTime{} +) diff --git a/pkg/icingadb/v1/history/meta.go b/pkg/icingadb/v1/history/meta.go new file mode 100644 index 0000000..434ed11 --- /dev/null +++ b/pkg/icingadb/v1/history/meta.go @@ -0,0 +1,80 @@ +package history + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +// UpserterEntity provides upsert for entities. +type UpserterEntity interface { + contracts.Upserter + contracts.Entity +} + +// HistoryTableEntity is embedded by every concrete history type that has its own table. +type HistoryTableEntity struct { + v1.EntityWithoutChecksum `json:",inline"` +} + +// Upsert implements the contracts.Upserter interface. +// Update only the Id (effectively nothing). +func (hte HistoryTableEntity) Upsert() interface{} { + return hte +} + +// HistoryEntity is embedded by every concrete history type. +type HistoryEntity struct { + Id types.Binary `json:"event_id"` +} + +// Fingerprint implements part of the contracts.Entity interface. +func (he HistoryEntity) Fingerprint() contracts.Fingerprinter { + return he +} + +// ID implements part of the contracts.Entity interface. +func (he HistoryEntity) ID() contracts.ID { + return he.Id +} + +// SetID implements part of the contracts.Entity interface. +func (he *HistoryEntity) SetID(id contracts.ID) { + he.Id = id.(types.Binary) +} + +// Upsert implements the contracts.Upserter interface. +// Update only the Id (effectively nothing). +func (he HistoryEntity) Upsert() interface{} { + return he +} + +// HistoryTableMeta is embedded by every concrete history type that has its own table. +type HistoryTableMeta struct { + EnvironmentId types.Binary `json:"environment_id"` + EndpointId types.Binary `json:"endpoint_id"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` +} + +// HistoryMeta is embedded by every concrete history type that belongs to the history table. +type HistoryMeta struct { + HistoryEntity `json:",inline"` + EnvironmentId types.Binary `json:"environment_id"` + EndpointId types.Binary `json:"endpoint_id"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` + EventType string `json:"event_type"` +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*HistoryTableEntity)(nil) + _ contracts.Upserter = HistoryTableEntity{} + _ contracts.Entity = (*HistoryEntity)(nil) + _ contracts.Upserter = HistoryEntity{} + _ contracts.Entity = (*HistoryMeta)(nil) + _ contracts.Upserter = (*HistoryMeta)(nil) +) diff --git a/pkg/icingadb/v1/history/notification.go b/pkg/icingadb/v1/history/notification.go new file mode 100644 index 0000000..17fd375 --- /dev/null +++ b/pkg/icingadb/v1/history/notification.go @@ -0,0 +1,50 @@ +package history + +import ( + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +type NotificationHistory struct { + HistoryTableEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + NotificationId types.Binary `json:"notification_id"` + Type types.NotificationType `json:"type"` + SendTime types.UnixMilli `json:"send_time"` + State uint8 `json:"state"` + PreviousHardState uint8 `json:"previous_hard_state"` + Author string `json:"author"` + Text types.String `json:"text"` + UsersNotified uint16 `json:"users_notified"` +} + +type UserNotificationHistory struct { + v1.EntityWithoutChecksum `json:",inline"` + v1.EnvironmentMeta `json:",inline"` + NotificationHistoryId types.Binary `json:"notification_history_id"` + UserId types.Binary `json:"user_id"` +} + +func (u *UserNotificationHistory) Upsert() interface{} { + return u +} + +type HistoryNotification struct { + HistoryMeta `json:",inline"` + NotificationHistoryId types.Binary `json:"id"` + EventTime types.UnixMilli `json:"send_time"` +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryNotification) TableName() string { + return "history" +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*NotificationHistory)(nil) + _ UpserterEntity = (*UserNotificationHistory)(nil) + _ contracts.TableNamer = (*HistoryNotification)(nil) + _ UpserterEntity = (*HistoryNotification)(nil) +) diff --git a/pkg/icingadb/v1/history/state.go b/pkg/icingadb/v1/history/state.go new file mode 100644 index 0000000..dec13b0 --- /dev/null +++ b/pkg/icingadb/v1/history/state.go @@ -0,0 +1,51 @@ +package history + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type StateHistory struct { + HistoryTableEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + EventTime types.UnixMilli `json:"event_time"` + StateType types.StateType `json:"state_type"` + SoftState uint8 `json:"soft_state"` + HardState uint8 `json:"hard_state"` + PreviousSoftState uint8 `json:"previous_soft_state"` + PreviousHardState uint8 `json:"previous_hard_state"` + CheckAttempt uint8 `json:"check_attempt"` + Output types.String `json:"output"` + LongOutput types.String `json:"long_output"` + MaxCheckAttempts uint32 `json:"max_check_attempts"` + CheckSource types.String `json:"check_source"` + SchedulingSource types.String `json:"scheduling_source"` +} + +type HistoryState struct { + HistoryMeta `json:",inline"` + StateHistoryId types.Binary `json:"id"` + EventTime types.UnixMilli `json:"event_time"` +} + +// TableName implements the contracts.TableNamer interface. +func (*HistoryState) TableName() string { + return "history" +} + +type SlaHistoryState struct { + HistoryTableEntity `json:",inline"` + HistoryTableMeta `json:",inline"` + EventTime types.UnixMilli `json:"event_time"` + StateType types.StateType `json:"state_type" db:"-"` + HardState uint8 `json:"hard_state"` + PreviousHardState uint8 `json:"previous_hard_state"` +} + +// Assert interface compliance. +var ( + _ UpserterEntity = (*StateHistory)(nil) + _ contracts.TableNamer = (*HistoryState)(nil) + _ UpserterEntity = (*HistoryState)(nil) + _ UpserterEntity = (*SlaHistoryState)(nil) +) diff --git a/pkg/icingadb/v1/host.go b/pkg/icingadb/v1/host.go new file mode 100644 index 0000000..fbab47c --- /dev/null +++ b/pkg/icingadb/v1/host.go @@ -0,0 +1,122 @@ +package v1 + +import ( + "bytes" + "database/sql/driver" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" + "net" +) + +type Host struct { + Checkable `json:",inline"` + Address string `json:"address"` + Address6 string `json:"address6"` + AddressBin AddressBin `json:"-"` + Address6Bin Address6Bin `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (h *Host) Init() { + h.Checkable.Init() + h.AddressBin.Host = h + h.Address6Bin.Host = h +} + +type AddressBin struct { + Host *Host `db:"-"` +} + +var v4InV6Prefix = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff} + +// Value implements the driver.Valuer interface. +func (ab AddressBin) Value() (driver.Value, error) { + if ab.Host == nil { + return nil, nil + } + + ip := net.ParseIP(ab.Host.Address) + if ip == nil { + return nil, nil + } + + if ip = bytes.TrimPrefix(ip, v4InV6Prefix); len(ip) == 4 { + return []byte(ip), nil + } else { + return nil, nil + } +} + +type Address6Bin struct { + Host *Host `db:"-"` +} + +// Value implements the driver.Valuer interface. +func (ab Address6Bin) Value() (driver.Value, error) { + if ab.Host == nil { + return nil, nil + } + + if ip := net.ParseIP(ab.Host.Address6); ip == nil { + return nil, nil + } else { + return []byte(ip), nil + } +} + +type HostCustomvar struct { + CustomvarMeta `json:",inline"` + HostId types.Binary `json:"host_id"` +} + +type HostState struct { + State `json:",inline"` + HostId types.Binary `json:"host_id"` +} + +type Hostgroup struct { + GroupMeta `json:",inline"` +} + +type HostgroupCustomvar struct { + CustomvarMeta `json:",inline"` + HostgroupId types.Binary `json:"hostgroup_id"` +} + +type HostgroupMember struct { + MemberMeta `json:",inline"` + HostId types.Binary `json:"host_id"` + HostgroupId types.Binary `json:"hostgroup_id"` +} + +func NewHost() contracts.Entity { + return &Host{} +} + +func NewHostCustomvar() contracts.Entity { + return &HostCustomvar{} +} + +func NewHostState() contracts.Entity { + return &HostState{} +} + +func NewHostgroup() contracts.Entity { + return &Hostgroup{} +} + +func NewHostgroupCustomvar() contracts.Entity { + return &HostgroupCustomvar{} +} + +func NewHostgroupMember() contracts.Entity { + return &HostgroupMember{} +} + +// Assert interface compliance. +var ( + _ contracts.Initer = (*Host)(nil) + _ driver.Valuer = AddressBin{} + _ driver.Valuer = Address6Bin{} + _ contracts.Initer = (*Hostgroup)(nil) +) diff --git a/pkg/icingadb/v1/icingadb_instance.go b/pkg/icingadb/v1/icingadb_instance.go new file mode 100644 index 0000000..d694b74 --- /dev/null +++ b/pkg/icingadb/v1/icingadb_instance.go @@ -0,0 +1,21 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/types" +) + +type IcingadbInstance struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + EndpointId types.Binary `json:"endpoint_id"` + Heartbeat types.UnixMilli `json:"heartbeat"` + Responsible types.Bool `json:"responsible"` + Icinga2Version string `json:"icinga2_version"` + Icinga2StartTime types.UnixMilli `json:"icinga2_start_Time"` + Icinga2NotificationsEnabled types.Bool `json:"icinga2_notifications_enabled"` + Icinga2ActiveServiceChecksEnabled types.Bool `json:"icinga2_active_service_checks_enabled"` + Icinga2ActiveHostChecksEnabled types.Bool `json:"icinga2_active_host_checks_enabled"` + Icinga2EventHandlersEnabled types.Bool `json:"icinga2_event_handlers_enabled"` + Icinga2FlapDetectionEnabled types.Bool `json:"icinga2_flap_detection_enabled"` + Icinga2PerformanceDataEnabled types.Bool `json:"icinga2_performance_data_enabled"` +} diff --git a/pkg/icingadb/v1/meta.go b/pkg/icingadb/v1/meta.go new file mode 100644 index 0000000..9266751 --- /dev/null +++ b/pkg/icingadb/v1/meta.go @@ -0,0 +1,86 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +// ChecksumMeta is embedded by every type with a checksum. +type ChecksumMeta struct { + PropertiesChecksum types.Binary `json:"checksum"` +} + +// Checksum implements part of the contracts.Checksumer interface. +func (m ChecksumMeta) Checksum() contracts.Checksum { + return m.PropertiesChecksum +} + +// SetChecksum implements part of the contracts.Checksumer interface. +func (m *ChecksumMeta) SetChecksum(checksum contracts.Checksum) { + m.PropertiesChecksum = checksum.(types.Binary) +} + +// EnvironmentMeta is embedded by every type which belongs to an environment. +type EnvironmentMeta struct { + EnvironmentId types.Binary `json:"environment_id"` +} + +// IdMeta is embedded by every type Icinga DB should synchronize. +type IdMeta struct { + Id types.Binary `json:"id"` +} + +// ID implements part of the contracts.IDer interface. +func (m IdMeta) ID() contracts.ID { + return m.Id +} + +// SetID implements part of the contracts.IDer interface. +func (m *IdMeta) SetID(id contracts.ID) { + m.Id = id.(types.Binary) +} + +// NameMeta is embedded by every type with a name. +type NameMeta struct { + Name string `json:"name"` + NameChecksum types.Binary `json:"name_checksum"` +} + +// NameCiMeta is embedded by every type with a case insensitive name. +type NameCiMeta struct { + NameMeta `json:",inline"` + NameCi *string `json:"-"` +} + +// Init implements the contracts.Initer interface. +func (n *NameCiMeta) Init() { + n.NameCi = &n.Name +} + +// CustomvarMeta is embedded by every type with custom variables. +type CustomvarMeta struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + CustomvarId types.Binary `json:"customvar_id"` +} + +// GroupMeta is embedded by every type that represents a specific group. +type GroupMeta struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameCiMeta `json:",inline"` + DisplayName string `json:"display_name"` + ZoneId types.Binary `json:"zone_id"` +} + +// MemberMeta is embedded by every type that represents members of a specific group. +type MemberMeta struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` +} + +// Assert interface compliance. +var ( + _ contracts.Initer = (*NameCiMeta)(nil) + _ contracts.Initer = (*GroupMeta)(nil) +) diff --git a/pkg/icingadb/v1/notification.go b/pkg/icingadb/v1/notification.go new file mode 100644 index 0000000..31bdbbf --- /dev/null +++ b/pkg/icingadb/v1/notification.go @@ -0,0 +1,74 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type Notification struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameCiMeta `json:",inline"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` + NotificationcommandId types.Binary `json:"notificationcommand_id"` + TimesBegin types.Int `json:"times_begin"` + TimesEnd types.Int `json:"times_end"` + NotificationInterval uint32 `json:"notification_interval"` + TimeperiodId types.Binary `json:"timeperiod_id"` + States types.NotificationStates `json:"states"` + Types types.NotificationTypes `json:"types"` + ZoneId types.Binary `json:"zone_id"` +} + +type NotificationUser struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NotificationId types.Binary `json:"notification_id"` + UserId types.Binary `json:"user_id"` +} + +type NotificationUsergroup struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NotificationId types.Binary `json:"notification_id"` + UsergroupId types.Binary `json:"usergroup_id"` +} + +type NotificationRecipient struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NotificationId types.Binary `json:"notification_id"` + UserId types.Binary `json:"user_id"` + UsergroupId types.Binary `json:"usergroup_id"` +} + +type NotificationCustomvar struct { + CustomvarMeta `json:",inline"` + NotificationId types.Binary `json:"notification_id"` +} + +func NewNotification() contracts.Entity { + return &Notification{} +} + +func NewNotificationUser() contracts.Entity { + return &NotificationUser{} +} + +func NewNotificationUsergroup() contracts.Entity { + return &NotificationUsergroup{} +} + +func NewNotificationRecipient() contracts.Entity { + return &NotificationRecipient{} +} + +func NewNotificationCustomvar() contracts.Entity { + return &NotificationCustomvar{} +} + +// Assert interface compliance. +var ( + _ contracts.Initer = (*Notification)(nil) +) diff --git a/pkg/icingadb/v1/overdue/host.go b/pkg/icingadb/v1/overdue/host.go new file mode 100644 index 0000000..9d42994 --- /dev/null +++ b/pkg/icingadb/v1/overdue/host.go @@ -0,0 +1,26 @@ +package overdue + +import ( + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +type HostState struct { + v1.EntityWithoutChecksum + IsOverdue types.Bool `json:"is_overdue"` +} + +func NewHostState(id string, overdue bool) (contracts.Entity, error) { + hs := &HostState{IsOverdue: types.Bool{ + Bool: overdue, + Valid: true, + }} + + return hs, hs.Id.UnmarshalText([]byte(id)) +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*HostState)(nil) +) diff --git a/pkg/icingadb/v1/overdue/service.go b/pkg/icingadb/v1/overdue/service.go new file mode 100644 index 0000000..dfd5383 --- /dev/null +++ b/pkg/icingadb/v1/overdue/service.go @@ -0,0 +1,26 @@ +package overdue + +import ( + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" +) + +type ServiceState struct { + v1.EntityWithoutChecksum + IsOverdue types.Bool `json:"is_overdue"` +} + +func NewServiceState(id string, overdue bool) (contracts.Entity, error) { + hs := &ServiceState{IsOverdue: types.Bool{ + Bool: overdue, + Valid: true, + }} + + return hs, hs.Id.UnmarshalText([]byte(id)) +} + +// Assert interface compliance. +var ( + _ contracts.Entity = (*ServiceState)(nil) +) diff --git a/pkg/icingadb/v1/service.go b/pkg/icingadb/v1/service.go new file mode 100644 index 0000000..4045449 --- /dev/null +++ b/pkg/icingadb/v1/service.go @@ -0,0 +1,67 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type Service struct { + Checkable `json:",inline"` + HostId types.Binary `json:"host_id"` +} + +type ServiceCustomvar struct { + CustomvarMeta `json:",inline"` + ServiceId types.Binary `json:"service_id"` +} + +type ServiceState struct { + State `json:",inline"` + ServiceId types.Binary `json:"service_id"` + HostId types.Binary `json:"host_id"` +} + +type Servicegroup struct { + GroupMeta `json:",inline"` +} + +type ServicegroupCustomvar struct { + CustomvarMeta `json:",inline"` + ServicegroupId types.Binary `json:"servicegroup_id"` +} + +type ServicegroupMember struct { + MemberMeta `json:",inline"` + ServiceId types.Binary `json:"service_id"` + ServicegroupId types.Binary `json:"servicegroup_id"` +} + +func NewService() contracts.Entity { + return &Service{} +} + +func NewServiceCustomvar() contracts.Entity { + return &ServiceCustomvar{} +} + +func NewServiceState() contracts.Entity { + return &ServiceState{} +} + +func NewServicegroup() contracts.Entity { + return &Servicegroup{} +} + +func NewServicegroupCustomvar() contracts.Entity { + return &ServicegroupCustomvar{} +} + +func NewServicegroupMember() contracts.Entity { + return &ServicegroupMember{} +} + +// Assert interface compliance. +var ( + _ contracts.Initer = (*Service)(nil) + _ contracts.Initer = (*Servicegroup)(nil) +) diff --git a/pkg/icingadb/v1/state.go b/pkg/icingadb/v1/state.go new file mode 100644 index 0000000..bad8f28 --- /dev/null +++ b/pkg/icingadb/v1/state.go @@ -0,0 +1,39 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/types" +) + +type State struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + AcknowledgementCommentId types.Binary `json:"acknowledgement_comment_id"` + LastCommentId types.Binary `json:"last_comment_id"` + CheckAttempt uint8 `json:"check_attempt"` + CheckCommandline types.String `json:"check_commandline"` + CheckSource types.String `json:"check_source"` + SchedulingSource types.String `json:"scheduling_source"` + ExecutionTime float64 `json:"execution_time"` + HardState uint8 `json:"hard_state"` + InDowntime types.Bool `json:"in_downtime"` + IsAcknowledged types.AcknowledgementState `json:"is_acknowledged"` + IsFlapping types.Bool `json:"is_flapping"` + IsHandled types.Bool `json:"is_handled"` + IsProblem types.Bool `json:"is_problem"` + IsReachable types.Bool `json:"is_reachable"` + LastStateChange types.UnixMilli `json:"last_state_change"` + LastUpdate types.UnixMilli `json:"last_update"` + Latency float64 `json:"latency"` + LongOutput types.String `json:"long_output"` + NextCheck types.UnixMilli `json:"next_check"` + NextUpdate types.UnixMilli `json:"next_update"` + Output types.String `json:"output"` + PerformanceData types.String `json:"performance_data"` + NormalizedPerformanceData types.String `json:"normalized_performance_data"` + PreviousSoftState uint8 `json:"previous_soft_state"` + PreviousHardState uint8 `json:"previous_hard_state"` + Severity uint16 `json:"severity"` + SoftState uint8 `json:"soft_state"` + StateType types.StateType `json:"state_type"` + CheckTimeout float64 `json:"check_timeout"` +} diff --git a/pkg/icingadb/v1/timeperiod.go b/pkg/icingadb/v1/timeperiod.go new file mode 100644 index 0000000..06a3bd2 --- /dev/null +++ b/pkg/icingadb/v1/timeperiod.go @@ -0,0 +1,67 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type Timeperiod struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameCiMeta `json:",inline"` + DisplayName string `json:"display_name"` + PreferIncludes types.Bool `json:"prefer_includes"` + ZoneId types.Binary `json:"zone_id"` +} + +type TimeperiodRange struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + TimeperiodId types.Binary `json:"timeperiod_id"` + RangeKey string `json:"range_key"` + RangeValue string `json:"range_value"` +} + +type TimeperiodOverrideInclude struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + TimeperiodId types.Binary `json:"timeperiod_id"` + OverrideId types.Binary `json:"include_id"` +} + +type TimeperiodOverrideExclude struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + TimeperiodId types.Binary `json:"timeperiod_id"` + OverrideId types.Binary `json:"exclude_id"` +} + +type TimeperiodCustomvar struct { + CustomvarMeta `json:",inline"` + TimeperiodId types.Binary `json:"timeperiod_id"` +} + +func NewTimeperiod() contracts.Entity { + return &Timeperiod{} +} + +func NewTimeperiodRange() contracts.Entity { + return &TimeperiodRange{} +} + +func NewTimeperiodOverrideInclude() contracts.Entity { + return &TimeperiodOverrideInclude{} +} + +func NewTimeperiodOverrideExclude() contracts.Entity { + return &TimeperiodOverrideExclude{} +} + +func NewTimeperiodCustomvar() contracts.Entity { + return &TimeperiodCustomvar{} +} + +// Assert interface compliance. +var ( + _ contracts.Initer = (*Timeperiod)(nil) +) diff --git a/pkg/icingadb/v1/url.go b/pkg/icingadb/v1/url.go new file mode 100644 index 0000000..cf70abc --- /dev/null +++ b/pkg/icingadb/v1/url.go @@ -0,0 +1,33 @@ +package v1 + +import "github.com/icinga/icingadb/pkg/contracts" + +type ActionUrl struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + ActionUrl string `json:"action_url"` +} + +type NotesUrl struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NotesUrl string `json:"notes_url"` +} + +type IconImage struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + IconImage string `json:"icon_image"` +} + +func NewActionUrl() contracts.Entity { + return &ActionUrl{} +} + +func NewNotesUrl() contracts.Entity { + return &NotesUrl{} +} + +func NewIconImage() contracts.Entity { + return &IconImage{} +} diff --git a/pkg/icingadb/v1/user.go b/pkg/icingadb/v1/user.go new file mode 100644 index 0000000..6a44050 --- /dev/null +++ b/pkg/icingadb/v1/user.go @@ -0,0 +1,66 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" +) + +type User struct { + EntityWithChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameCiMeta `json:",inline"` + DisplayName string `json:"display_name"` + Email string `json:"email"` + Pager string `json:"pager"` + NotificationsEnabled types.Bool `json:"notifications_enabled"` + TimeperiodId types.Binary `json:"timeperiod_id"` + States types.NotificationStates `json:"states"` + Types types.NotificationTypes `json:"types"` + ZoneId types.Binary `json:"zone_id"` +} + +type UserCustomvar struct { + CustomvarMeta `json:",inline"` + UserId types.Binary `json:"user_id"` +} + +type Usergroup struct { + GroupMeta `json:",inline"` +} + +type UsergroupCustomvar struct { + CustomvarMeta `json:",inline"` + UsergroupId types.Binary `json:"usergroup_id"` +} + +type UsergroupMember struct { + MemberMeta `json:",inline"` + UserId types.Binary `json:"user_id"` + UsergroupId types.Binary `json:"usergroup_id"` +} + +func NewUser() contracts.Entity { + return &User{} +} + +func NewUserCustomvar() contracts.Entity { + return &UserCustomvar{} +} + +func NewUsergroup() contracts.Entity { + return &Usergroup{} +} + +func NewUsergroupCustomvar() contracts.Entity { + return &UsergroupCustomvar{} +} + +func NewUsergroupMember() contracts.Entity { + return &UsergroupMember{} +} + +// Assert interface compliance. +var ( + _ contracts.Initer = (*User)(nil) + _ contracts.Initer = (*Usergroup)(nil) +) diff --git a/pkg/icingadb/v1/v1.go b/pkg/icingadb/v1/v1.go new file mode 100644 index 0000000..af19fdf --- /dev/null +++ b/pkg/icingadb/v1/v1.go @@ -0,0 +1,58 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/contracts" +) + +var StateFactories = []contracts.EntityFactoryFunc{NewHostState, NewServiceState} + +var ConfigFactories = []contracts.EntityFactoryFunc{ + NewActionUrl, + NewCheckcommand, + NewCheckcommandArgument, + NewCheckcommandCustomvar, + NewCheckcommandEnvvar, + NewComment, + NewDowntime, + NewEndpoint, + NewEventcommand, + NewEventcommandArgument, + NewEventcommandCustomvar, + NewEventcommandEnvvar, + NewHost, + NewHostCustomvar, + NewHostgroup, + NewHostgroupCustomvar, + NewHostgroupMember, + NewIconImage, + NewNotesUrl, + NewNotification, + NewNotificationcommand, + NewNotificationcommandArgument, + NewNotificationcommandCustomvar, + NewNotificationcommandEnvvar, + NewNotificationCustomvar, + NewNotificationRecipient, + NewNotificationUser, + NewNotificationUsergroup, + NewService, + NewServiceCustomvar, + NewServicegroup, + NewServicegroupCustomvar, + NewServicegroupMember, + NewTimeperiod, + NewTimeperiodCustomvar, + NewTimeperiodOverrideExclude, + NewTimeperiodOverrideInclude, + NewTimeperiodRange, + NewUser, + NewUserCustomvar, + NewUsergroup, + NewUsergroupCustomvar, + NewUsergroupMember, + NewZone, +} + +// contextKey is an unexported type for context keys defined in this package. +// This prevents collisions with keys defined in other packages. +type contextKey int diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go new file mode 100644 index 0000000..d42713c --- /dev/null +++ b/pkg/icingaredis/client.go @@ -0,0 +1,243 @@ +package icingaredis + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" + "runtime" + "time" +) + +// Client is a wrapper around redis.Client with +// streaming and logging capabilities. +type Client struct { + *redis.Client + + Options *Options + + logger *logging.Logger +} + +// Options define user configurable Redis options. +type Options struct { + BlockTimeout time.Duration `yaml:"block_timeout" default:"1s"` + HMGetCount int `yaml:"hmget_count" default:"4096"` + HScanCount int `yaml:"hscan_count" default:"4096"` + MaxHMGetConnections int `yaml:"max_hmget_connections" default:"8"` + Timeout time.Duration `yaml:"timeout" default:"30s"` + XReadCount int `yaml:"xread_count" default:"4096"` +} + +// Validate checks constraints in the supplied Redis options and returns an error if they are violated. +func (o *Options) Validate() error { + if o.BlockTimeout <= 0 { + return errors.New("block_timeout must be positive") + } + if o.HMGetCount < 1 { + return errors.New("hmget_count must be at least 1") + } + if o.HScanCount < 1 { + return errors.New("hscan_count must be at least 1") + } + if o.MaxHMGetConnections < 1 { + return errors.New("max_hmget_connections must be at least 1") + } + if o.Timeout == 0 { + return errors.New("timeout cannot be 0. Configure a value greater than zero, or use -1 for no timeout") + } + if o.XReadCount < 1 { + return errors.New("xread_count must be at least 1") + } + + return nil +} + +// NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client. +func NewClient(client *redis.Client, logger *logging.Logger, options *Options) *Client { + return &Client{Client: client, logger: logger, Options: options} +} + +// HPair defines Redis hashes field-value pairs. +type HPair struct { + Field string + Value string +} + +// HYield yields HPair field-value pairs for all fields in the hash stored at key. +func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan error) { + pairs := make(chan HPair, c.Options.HScanCount) + + return pairs, com.WaitAsync(contracts.WaiterFunc(func() error { + var counter com.Counter + defer c.log(ctx, key, &counter).Stop() + defer close(pairs) + + seen := make(map[string]struct{}) + + var cursor uint64 + var err error + var page []string + + for { + cmd := c.HScan(ctx, key, cursor, "", int64(c.Options.HScanCount)) + page, cursor, err = cmd.Result() + + if err != nil { + return WrapCmdErr(cmd) + } + + for i := 0; i < len(page); i += 2 { + if _, ok := seen[page[i]]; ok { + // Ignore duplicate returned by HSCAN. + continue + } + + seen[page[i]] = struct{}{} + + select { + case pairs <- HPair{ + Field: page[i], + Value: page[i+1], + }: + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } + + if cursor == 0 { + break + } + } + + return nil + })) +} + +// HMYield yields HPair field-value pairs for the specified fields in the hash stored at key. +func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-chan HPair, <-chan error) { + pairs := make(chan HPair) + + return pairs, com.WaitAsync(contracts.WaiterFunc(func() error { + var counter com.Counter + defer c.log(ctx, key, &counter).Stop() + + g, ctx := errgroup.WithContext(ctx) + + defer func() { + // Wait until the group is done so that we can safely close the pairs channel, + // because on error, sem.Acquire will return before calling g.Wait(), + // which can result in goroutines working on a closed channel. + _ = g.Wait() + close(pairs) + }() + + // Use context from group. + batches := utils.BatchSliceOfStrings(ctx, fields, c.Options.HMGetCount) + + sem := semaphore.NewWeighted(int64(c.Options.MaxHMGetConnections)) + + for batch := range batches { + if err := sem.Acquire(ctx, 1); err != nil { + return errors.Wrap(err, "can't acquire semaphore") + } + + batch := batch + g.Go(func() error { + defer sem.Release(1) + + cmd := c.HMGet(ctx, key, batch...) + vals, err := cmd.Result() + + if err != nil { + return WrapCmdErr(cmd) + } + + for i, v := range vals { + if v == nil { + c.logger.Warnf("HMGET %s: field %#v missing", key, batch[i]) + continue + } + + select { + case pairs <- HPair{ + Field: batch[i], + Value: v.(string), + }: + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + })) +} + +// XReadUntilResult (repeatedly) calls XREAD with the specified arguments until a result is returned. +// Each call blocks at most for the duration specified in Options.BlockTimeout until data +// is available before it times out and the next call is made. +// This also means that an already set block timeout is overridden. +func (c *Client) XReadUntilResult(ctx context.Context, a *redis.XReadArgs) ([]redis.XStream, error) { + a.Block = c.Options.BlockTimeout + + for { + cmd := c.XRead(ctx, a) + streams, err := cmd.Result() + if err != nil { + if errors.Is(err, redis.Nil) { + continue + } + + return streams, WrapCmdErr(cmd) + } + + return streams, nil + } +} + +// YieldAll yields all entities from Redis that belong to the specified SyncSubject. +func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) { + key := utils.Key(utils.Name(subject.Entity()), ':') + if subject.WithChecksum() { + key = "icinga:checksum:" + key + } else { + key = "icinga:" + key + } + + pairs, errs := c.HYield(ctx, key) + g, ctx := errgroup.WithContext(ctx) + // Let errors from HYield cancel the group. + com.ErrgroupReceive(g, errs) + + desired, errs := CreateEntities(ctx, subject.FactoryForDelta(), pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel the group. + com.ErrgroupReceive(g, errs) + + return desired, com.WaitAsync(g) +} + +func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stopper { + return periodic.Start(ctx, c.logger.Interval(), func(tick periodic.Tick) { + // We may never get to progress logging here, + // as fetching should be completed before the interval expires, + // but if it does, it is good to have this log message. + if count := counter.Reset(); count > 0 { + c.logger.Debugf("Fetched %d items from %s", count, key) + } + }, periodic.OnStop(func(tick periodic.Tick) { + c.logger.Debugf("Finished fetching from %s with %d items in %s", key, counter.Total(), tick.Elapsed) + })) +} diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go new file mode 100644 index 0000000..9a8ebad --- /dev/null +++ b/pkg/icingaredis/heartbeat.go @@ -0,0 +1,221 @@ +package icingaredis + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "sync" + "sync/atomic" + "time" +) + +// timeout defines how long a heartbeat may be absent if a heartbeat has already been received. +// After this time, a heartbeat loss is propagated. +var timeout = 60 * time.Second + +// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. +// Also signals on if the heartbeat is Lost. +type Heartbeat struct { + active bool + events chan *HeartbeatMessage + lastReceivedMs int64 + cancelCtx context.CancelFunc + client *Client + done chan struct{} + errMu sync.Mutex + err error + logger *logging.Logger +} + +// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. +func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat { + ctx, cancelCtx := context.WithCancel(ctx) + + heartbeat := &Heartbeat{ + events: make(chan *HeartbeatMessage, 1), + cancelCtx: cancelCtx, + client: client, + done: make(chan struct{}), + logger: logger, + } + + go heartbeat.controller(ctx) + + return heartbeat +} + +// Events returns a channel that is sent to on heartbeat events. +// +// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss. +func (h *Heartbeat) Events() <-chan *HeartbeatMessage { + return h.events +} + +// LastReceived returns the last heartbeat's receive time in ms. +func (h *Heartbeat) LastReceived() int64 { + return atomic.LoadInt64(&h.lastReceivedMs) +} + +// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (h *Heartbeat) Close() error { + h.cancelCtx() + <-h.Done() + + return h.Err() +} + +// Done returns a channel that will be closed when the heartbeat controller loop has ended. +func (h *Heartbeat) Done() <-chan struct{} { + return h.done +} + +// Err returns an error if Done has been closed and there is an error. Otherwise returns nil. +func (h *Heartbeat) Err() error { + h.errMu.Lock() + defer h.errMu.Unlock() + + return h.err +} + +// controller loop. +func (h *Heartbeat) controller(ctx context.Context) { + defer close(h.done) + + messages := make(chan *HeartbeatMessage) + defer close(messages) + + g, ctx := errgroup.WithContext(ctx) + + // Message producer loop. + g.Go(func() error { + // We expect heartbeats every second but only read them every 3 seconds. + throttle := time.NewTicker(time.Second * 3) + defer throttle.Stop() + + for id := "$"; ; { + streams, err := h.client.XReadUntilResult(ctx, &redis.XReadArgs{ + Streams: []string{"icinga:stats", id}, + }) + if err != nil { + return errors.Wrap(err, "can't read Icinga heartbeat") + } + + m := &HeartbeatMessage{ + received: time.Now(), + stats: streams[0].Messages[0].Values, + } + + select { + case messages <- m: + case <-ctx.Done(): + return ctx.Err() + } + + id = streams[0].Messages[0].ID + + <-throttle.C + } + }) + + // State loop. + g.Go(func() error { + for { + select { + case m := <-messages: + if !h.active { + envId, err := m.EnvironmentID() + if err != nil { + return err + } + h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String())) + h.active = true + } + + atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) + h.sendEvent(m) + case <-time.After(timeout): + if h.active { + h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout)) + h.sendEvent(nil) + h.active = false + } else { + h.logger.Warn("Waiting for Icinga heartbeat") + } + + atomic.StoreInt64(&h.lastReceivedMs, 0) + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + // Since the goroutines of the group actually run endlessly, + // we wait here forever, unless an error occurs. + if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) { + // Do not propagate any context-canceled errors here, + // as this is to be expected when calling Close or + // when the parent context is canceled. + h.setError(err) + } +} + +func (h *Heartbeat) setError(err error) { + h.errMu.Lock() + defer h.errMu.Unlock() + + h.err = errors.Wrap(err, "heartbeat failed") +} + +func (h *Heartbeat) sendEvent(m *HeartbeatMessage) { + // Remove any not yet delivered event + select { + case old := <-h.events: + if old != nil { + kv := []any{zap.Time("previous", old.received)} + if m != nil { + kv = append(kv, zap.Time("current", m.received)) + } + + h.logger.Debugw("Previous heartbeat not read from channel", kv...) + } else { + h.logger.Debug("Previous heartbeat loss event not read from channel") + } + default: + } + + h.events <- m +} + +// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received. +type HeartbeatMessage struct { + received time.Time + stats v1.StatsMessage +} + +// Stats returns the underlying heartbeat message from the icinga:stats stream. +func (m *HeartbeatMessage) Stats() *v1.StatsMessage { + return &m.stats +} + +// EnvironmentID returns the Icinga DB environment ID stored in the heartbeat message. +func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) { + var id types.Binary + err := internal.UnmarshalJSON([]byte(m.stats["icingadb_environment"].(string)), &id) + if err != nil { + return nil, err + } + return id, nil +} + +// ExpiryTime returns the timestamp when the heartbeat expires. +func (m *HeartbeatMessage) ExpiryTime() time.Time { + return m.received.Add(timeout) +} diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go new file mode 100644 index 0000000..ee476a1 --- /dev/null +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -0,0 +1,203 @@ +package telemetry + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "regexp" + "runtime/metrics" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +// ha represents icingadb.HA to avoid import cycles. +type ha interface { + State() (weResponsibleMilli int64, weResponsible, otherResponsible bool) +} + +type SuccessfulSync struct { + FinishMilli int64 + DurationMilli int64 +} + +// currentDbConnErr stores ongoing errors from database connections. +var currentDbConnErr struct { + mu sync.Mutex + message string + sinceMilli int64 +} + +// UpdateCurrentDbConnErr updates the current error information stored in currentDbConnErr. +func UpdateCurrentDbConnErr(err error) { + now := time.Now().UnixMilli() + + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + if currentDbConnErr.sinceMilli >= now { + // Already updated with a more recent error, ignore this one. + return + } + + message := "" + if err != nil { + message = err.Error() + } + + if currentDbConnErr.message == message { + // Error stayed the same, no update needed, keeping the old timestamp. + return + } + + if currentDbConnErr.message == "" || message == "" { + // Either first error or recovery from an error, update timestamp. + currentDbConnErr.sinceMilli = now + } + + currentDbConnErr.message = message +} + +// GetCurrentDbConnErr returns the last error message (or the empty string if not in an error state) and a timestamp in +// milliseconds of the last change from OK to error or from error to OK. +func GetCurrentDbConnErr() (string, int64) { + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + return currentDbConnErr.message, currentDbConnErr.sinceMilli +} + +// OngoingSyncStartMilli is to be updated by the main() function. +var OngoingSyncStartMilli int64 + +// LastSuccessfulSync is to be updated by the main() function. +var LastSuccessfulSync com.Atomic[SuccessfulSync] + +var boolToStr = map[bool]string{false: "0", true: "1"} +var startTime = time.Now().UnixMilli() + +// StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2. +func StartHeartbeat( + ctx context.Context, client *icingaredis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat, +) { + goMetrics := NewGoMetrics() + + const interval = time.Second + + var lastErr string + var silenceUntil time.Time + + periodic.Start(ctx, interval, func(tick periodic.Tick) { + heartbeat := heartbeat.LastReceived() + responsibleTsMilli, responsible, otherResponsible := ha.State() + ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli) + sync, _ := LastSuccessfulSync.Load() + dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr() + now := time.Now() + + values := map[string]string{ + "version": internal.Version.Version, + "time": strconv.FormatInt(now.UnixMilli(), 10), + "start-time": strconv.FormatInt(startTime, 10), + "error": dbConnErr, + "error-since": strconv.FormatInt(dbConnErrSinceMilli, 10), + "performance-data": goMetrics.PerformanceData(), + "last-heartbeat-received": strconv.FormatInt(heartbeat, 10), + "ha-responsible": boolToStr[responsible], + "ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), + "ha-other-responsible": boolToStr[otherResponsible], + "sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), + "sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10), + "sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10), + } + + ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval)) + defer cancel() + + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:heartbeat", + MaxLen: 1, + Values: values, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) && !errors.Is(err, context.DeadlineExceeded) { + logw := logger.Debugw + currentErr := err.Error() + + if currentErr != lastErr || now.After(silenceUntil) { + logw = logger.Warnw + lastErr = currentErr + silenceUntil = now.Add(time.Minute) + } + + logw("Can't update own heartbeat", zap.Error(icingaredis.WrapCmdErr(cmd))) + } else { + lastErr = "" + silenceUntil = time.Time{} + } + }) +} + +type goMetrics struct { + names []string + units []string + samples []metrics.Sample +} + +func NewGoMetrics() *goMetrics { + m := &goMetrics{} + + forbiddenRe := regexp.MustCompile(`\W`) + + for _, d := range metrics.All() { + switch d.Kind { + case metrics.KindUint64, metrics.KindFloat64: + name := "go_" + strings.TrimLeft(forbiddenRe.ReplaceAllString(d.Name, "_"), "_") + + unit := "" + if strings.HasSuffix(d.Name, ":bytes") { + unit = "B" + } else if strings.HasSuffix(d.Name, ":seconds") { + unit = "s" + } else if d.Cumulative { + unit = "c" + } + + m.names = append(m.names, name) + m.units = append(m.units, unit) + m.samples = append(m.samples, metrics.Sample{Name: d.Name}) + } + } + + return m +} + +func (g *goMetrics) PerformanceData() string { + metrics.Read(g.samples) + + var buf strings.Builder + + for i, sample := range g.samples { + if i > 0 { + buf.WriteByte(' ') + } + + switch sample.Value.Kind() { + case metrics.KindUint64: + _, _ = fmt.Fprintf(&buf, "%s=%d%s", g.names[i], sample.Value.Uint64(), g.units[i]) + case metrics.KindFloat64: + _, _ = fmt.Fprintf(&buf, "%s=%f%s", g.names[i], sample.Value.Float64(), g.units[i]) + } + } + + return buf.String() +} diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go new file mode 100644 index 0000000..86db0b3 --- /dev/null +++ b/pkg/icingaredis/telemetry/stats.go @@ -0,0 +1,51 @@ +package telemetry + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "go.uber.org/zap" + "strconv" + "time" +) + +var Stats struct { + // Config & co. are to be increased by the T sync once for every T object synced. + Config, State, History, Overdue, HistoryCleanup com.Counter +} + +// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. +func WriteStats(ctx context.Context, client *icingaredis.Client, logger *logging.Logger) { + counters := map[string]*com.Counter{ + "config_sync": &Stats.Config, + "state_sync": &Stats.State, + "history_sync": &Stats.History, + "overdue_sync": &Stats.Overdue, + "history_cleanup": &Stats.HistoryCleanup, + } + + periodic.Start(ctx, time.Second, func(_ periodic.Tick) { + var data []string + for kind, counter := range counters { + if cnt := counter.Reset(); cnt > 0 { + data = append(data, kind, strconv.FormatUint(cnt, 10)) + } + } + + if data != nil { + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:stats", + MaxLen: 15 * 60, + Approx: true, + Values: data, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) { + logger.Warnw("Can't update own stats", zap.Error(icingaredis.WrapCmdErr(cmd))) + } + } + }) +} diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go new file mode 100644 index 0000000..9176dba --- /dev/null +++ b/pkg/icingaredis/utils.go @@ -0,0 +1,128 @@ +package icingaredis + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +// Streams represents a Redis stream key to ID mapping. +type Streams map[string]string + +// Option returns the Redis stream key to ID mapping +// as a slice of stream keys followed by their IDs +// that is compatible for the Redis STREAMS option. +func (s Streams) Option() []string { + // len*2 because we're appending the IDs later. + streams := make([]string, 0, len(s)*2) + ids := make([]string, 0, len(s)) + + for key, id := range s { + streams = append(streams, key) + ids = append(ids, id) + } + + return append(streams, ids...) +} + +// CreateEntities streams and creates entities from the +// given Redis field value pairs using the specified factory function, +// and streams them on a returned channel. +func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) { + entities := make(chan contracts.Entity) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entities) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < concurrent; i++ { + g.Go(func() error { + for pair := range pairs { + var id types.Binary + + if err := id.UnmarshalText([]byte(pair.Field)); err != nil { + return errors.Wrapf(err, "can't create ID from value %#v", pair.Field) + } + + e := factoryFunc() + if err := internal.UnmarshalJSON([]byte(pair.Value), e); err != nil { + return err + } + e.SetID(id) + + select { + case entities <- e: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + }) + + return entities, com.WaitAsync(g) +} + +// SetChecksums concurrently streams from the given entities and +// sets their checksums using the specified map and +// streams the results on a returned channel. +func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { + entitiesWithChecksum := make(chan contracts.Entity) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entitiesWithChecksum) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < concurrent; i++ { + g.Go(func() error { + for entity := range entities { + if checksumer, ok := checksums[entity.ID().String()]; ok { + entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum()) + } else { + return errors.Errorf("no checksum for %#v", entity) + } + + select { + case entitiesWithChecksum <- entity: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + }) + + return entitiesWithChecksum, com.WaitAsync(g) +} + +// WrapCmdErr adds the command itself and +// the stack of the current goroutine to the command's error if any. +func WrapCmdErr(cmd redis.Cmder) error { + err := cmd.Err() + if err != nil { + err = errors.Wrapf(err, "can't perform %q", utils.Ellipsize( + redis.NewCmd(context.Background(), cmd.Args()).String(), // Omits error in opposite to cmd.String() + 100, + )) + } + + return err +} diff --git a/pkg/icingaredis/v1/icinga_status.go b/pkg/icingaredis/v1/icinga_status.go new file mode 100644 index 0000000..d94d3d6 --- /dev/null +++ b/pkg/icingaredis/v1/icinga_status.go @@ -0,0 +1,21 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/types" +) + +// IcingaStatus defines Icinga status information. +type IcingaStatus struct { + // Note: Icinga2Environment is not related to the environment_id used throughout Icinga DB. + Icinga2Environment string `json:"environment"` + NodeName string `json:"node_name"` + Version string `json:"version"` + ProgramStart types.UnixMilli `json:"program_start"` + EndpointId types.Binary `json:"endpoint_id"` + NotificationsEnabled types.Bool `json:"enable_notifications"` + ActiveServiceChecksEnabled types.Bool `json:"enable_service_checks"` + ActiveHostChecksEnabled types.Bool `json:"enable_host_checks"` + EventHandlersEnabled types.Bool `json:"enable_event_handlers"` + FlapDetectionEnabled types.Bool `json:"enable_flapping"` + PerformanceDataEnabled types.Bool `json:"enable_perfdata"` +} diff --git a/pkg/icingaredis/v1/stats_message.go b/pkg/icingaredis/v1/stats_message.go new file mode 100644 index 0000000..5b04629 --- /dev/null +++ b/pkg/icingaredis/v1/stats_message.go @@ -0,0 +1,51 @@ +package v1 + +import ( + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/types" + "github.com/pkg/errors" +) + +// StatsMessage represents a message from the Redis stream icinga:stats. +type StatsMessage map[string]interface{} + +// Raw returns the key-value pairs of the message. +func (m StatsMessage) Raw() map[string]interface{} { + return m +} + +// IcingaStatus extracts Icinga status information from the message into IcingaStatus and returns it. +func (m StatsMessage) IcingaStatus() (*IcingaStatus, error) { + if s, ok := m["IcingaApplication"].(string); ok { + var envelope struct { + Status struct { + IcingaApplication struct { + IcingaStatus `json:"app"` + } `json:"icingaapplication"` + } `json:"status"` + } + + if err := internal.UnmarshalJSON([]byte(s), &envelope); err != nil { + return nil, err + } + + return &envelope.Status.IcingaApplication.IcingaStatus, nil + } + + return nil, errors.Errorf(`bad message %#v. "IcingaApplication" missing`, m) +} + +// Time extracts the timestamp of the message into types.UnixMilli and returns it. +func (m StatsMessage) Time() (*types.UnixMilli, error) { + if s, ok := m["timestamp"].(string); ok { + var t types.UnixMilli + + if err := internal.UnmarshalJSON([]byte(s), &t); err != nil { + return nil, err + } + + return &t, nil + } + + return nil, errors.Errorf(`bad message %#v. "timestamp" missing`, m) +} diff --git a/pkg/logging/journald_core.go b/pkg/logging/journald_core.go new file mode 100644 index 0000000..50dd39f --- /dev/null +++ b/pkg/logging/journald_core.go @@ -0,0 +1,85 @@ +package logging + +import ( + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "github.com/ssgreg/journald" + "go.uber.org/zap/zapcore" + "strings" + "unicode" +) + +// priorities maps zapcore.Level to journal.Priority. +var priorities = map[zapcore.Level]journald.Priority{ + zapcore.DebugLevel: journald.PriorityDebug, + zapcore.InfoLevel: journald.PriorityInfo, + zapcore.WarnLevel: journald.PriorityWarning, + zapcore.ErrorLevel: journald.PriorityErr, + zapcore.FatalLevel: journald.PriorityCrit, + zapcore.PanicLevel: journald.PriorityCrit, + zapcore.DPanicLevel: journald.PriorityCrit, +} + +// NewJournaldCore returns a zapcore.Core that sends log entries to systemd-journald and +// uses the given identifier as a prefix for structured logging context that is sent as journal fields. +func NewJournaldCore(identifier string, enab zapcore.LevelEnabler) zapcore.Core { + return &journaldCore{ + LevelEnabler: enab, + identifier: identifier, + identifierU: strings.ToUpper(identifier), + } +} + +type journaldCore struct { + zapcore.LevelEnabler + context []zapcore.Field + identifier string + identifierU string +} + +func (c *journaldCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if c.Enabled(ent.Level) { + return ce.AddCore(ent, c) + } + + return ce +} + +func (c *journaldCore) Sync() error { + return nil +} + +func (c *journaldCore) With(fields []zapcore.Field) zapcore.Core { + cc := *c + cc.context = append(cc.context[:len(cc.context):len(cc.context)], fields...) + + return &cc +} + +func (c *journaldCore) Write(ent zapcore.Entry, fields []zapcore.Field) error { + pri, ok := priorities[ent.Level] + if !ok { + return errors.Errorf("unknown log level %q", ent.Level) + } + + enc := zapcore.NewMapObjectEncoder() + c.addFields(enc, fields) + c.addFields(enc, c.context) + enc.Fields["SYSLOG_IDENTIFIER"] = c.identifier + + message := ent.Message + if ent.LoggerName != c.identifier { + message = ent.LoggerName + ": " + message + } + + return journald.Send(message, pri, enc.Fields) +} + +func (c *journaldCore) addFields(enc zapcore.ObjectEncoder, fields []zapcore.Field) { + for _, field := range fields { + field.Key = c.identifierU + + "_" + + utils.ConvertCamelCase(field.Key, unicode.UpperCase, '_') + field.AddTo(enc) + } +} diff --git a/pkg/logging/logger.go b/pkg/logging/logger.go new file mode 100644 index 0000000..490445e --- /dev/null +++ b/pkg/logging/logger.go @@ -0,0 +1,26 @@ +package logging + +import ( + "go.uber.org/zap" + "time" +) + +// Logger wraps zap.SugaredLogger and +// allows to get the interval for periodic logging. +type Logger struct { + *zap.SugaredLogger + interval time.Duration +} + +// NewLogger returns a new Logger. +func NewLogger(base *zap.SugaredLogger, interval time.Duration) *Logger { + return &Logger{ + SugaredLogger: base, + interval: interval, + } +} + +// Interval returns the interval for periodic logging. +func (l *Logger) Interval() time.Duration { + return l.interval +} diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go new file mode 100644 index 0000000..e310695 --- /dev/null +++ b/pkg/logging/logging.go @@ -0,0 +1,131 @@ +package logging + +import ( + "fmt" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "os" + "sync" + "time" +) + +const ( + CONSOLE = "console" + JOURNAL = "systemd-journald" +) + +// defaultEncConfig defines the default zapcore.EncoderConfig for the logging package. +var defaultEncConfig = zapcore.EncoderConfig{ + TimeKey: "ts", + LevelKey: "level", + NameKey: "logger", + CallerKey: "caller", + MessageKey: "msg", + StacktraceKey: "stacktrace", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: zapcore.CapitalLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, +} + +// Options define child loggers with their desired log level. +type Options map[string]zapcore.Level + +// Logging implements access to a default logger and named child loggers. +// Log levels can be configured per named child via Options which, if not configured, +// fall back on a default log level. +// Logs either to the console or to systemd-journald. +type Logging struct { + logger *Logger + output string + verbosity zap.AtomicLevel + interval time.Duration + + // coreFactory creates zapcore.Core based on the log level and the log output. + coreFactory func(zap.AtomicLevel) zapcore.Core + + mu sync.Mutex + loggers map[string]*Logger + + options Options +} + +// NewLogging takes the name and log level for the default logger, +// output where log messages are written to, +// options having log levels for named child loggers +// and returns a new Logging. +func NewLogging(name string, level zapcore.Level, output string, options Options, interval time.Duration) (*Logging, error) { + verbosity := zap.NewAtomicLevelAt(level) + + var coreFactory func(zap.AtomicLevel) zapcore.Core + switch output { + case CONSOLE: + enc := zapcore.NewConsoleEncoder(defaultEncConfig) + ws := zapcore.Lock(os.Stderr) + coreFactory = func(verbosity zap.AtomicLevel) zapcore.Core { + return zapcore.NewCore(enc, ws, verbosity) + } + case JOURNAL: + coreFactory = func(verbosity zap.AtomicLevel) zapcore.Core { + return NewJournaldCore(name, verbosity) + } + default: + return nil, invalidOutput(output) + } + + logger := NewLogger(zap.New(coreFactory(verbosity)).Named(name).Sugar(), interval) + + return &Logging{ + logger: logger, + output: output, + verbosity: verbosity, + interval: interval, + coreFactory: coreFactory, + loggers: make(map[string]*Logger), + options: options, + }, + nil +} + +// GetChildLogger returns a named child logger. +// Log levels for named child loggers are obtained from the logging options and, if not found, +// set to the default log level. +func (l *Logging) GetChildLogger(name string) *Logger { + l.mu.Lock() + defer l.mu.Unlock() + + if logger, ok := l.loggers[name]; ok { + return logger + } + + var verbosity zap.AtomicLevel + if level, found := l.options[name]; found { + verbosity = zap.NewAtomicLevelAt(level) + } else { + verbosity = l.verbosity + } + + logger := NewLogger(zap.New(l.coreFactory(verbosity)).Named(name).Sugar(), l.interval) + l.loggers[name] = logger + + return logger +} + +// GetLogger returns the default logger. +func (l *Logging) GetLogger() *Logger { + return l.logger +} + +// AssertOutput returns an error if output is not a valid logger output. +func AssertOutput(o string) error { + if o == CONSOLE || o == JOURNAL { + return nil + } + + return invalidOutput(o) +} + +func invalidOutput(o string) error { + return fmt.Errorf("%s is not a valid logger output. Must be either %q or %q", o, CONSOLE, JOURNAL) +} diff --git a/pkg/periodic/periodic.go b/pkg/periodic/periodic.go new file mode 100644 index 0000000..6ef5ceb --- /dev/null +++ b/pkg/periodic/periodic.go @@ -0,0 +1,123 @@ +package periodic + +import ( + "context" + "sync" + "time" +) + +// Option configures Start. +type Option interface { + apply(*periodic) +} + +// Stopper implements the Stop method, +// which stops a periodic task from Start(). +type Stopper interface { + Stop() // Stops a periodic task. +} + +// Tick is the value for periodic task callbacks that +// contains the time of the tick and +// the time elapsed since the start of the periodic task. +type Tick struct { + Elapsed time.Duration + Time time.Time +} + +// Immediate starts the periodic task immediately instead of after the first tick. +func Immediate() Option { + return optionFunc(func(p *periodic) { + p.immediate = true + }) +} + +// OnStop configures a callback that is executed when a periodic task is stopped or canceled. +func OnStop(f func(Tick)) Option { + return optionFunc(func(p *periodic) { + p.onStop = f + }) +} + +// Start starts a periodic task with a ticker at the specified interval, +// which executes the given callback after each tick. +// Pending tasks do not overlap, but could start immediately if +// the previous task(s) takes longer than the interval. +// Call Stop() on the return value in order to stop the ticker and to release associated resources. +// The interval must be greater than zero. +func Start(ctx context.Context, interval time.Duration, callback func(Tick), options ...Option) Stopper { + t := &periodic{ + interval: interval, + callback: callback, + } + + for _, option := range options { + option.apply(t) + } + + ctx, cancelCtx := context.WithCancel(ctx) + + start := time.Now() + + go func() { + done := false + + if !t.immediate { + select { + case <-time.After(interval): + case <-ctx.Done(): + done = true + } + } + + if !done { + ticker := time.NewTicker(t.interval) + defer ticker.Stop() + + for tickTime := time.Now(); !done; { + t.callback(Tick{ + Elapsed: tickTime.Sub(start), + Time: tickTime, + }) + + select { + case tickTime = <-ticker.C: + case <-ctx.Done(): + done = true + } + } + } + + if t.onStop != nil { + now := time.Now() + t.onStop(Tick{ + Elapsed: now.Sub(start), + Time: now, + }) + } + }() + + return stoperFunc(func() { + t.stop.Do(cancelCtx) + }) +} + +type optionFunc func(*periodic) + +func (f optionFunc) apply(p *periodic) { + f(p) +} + +type stoperFunc func() + +func (f stoperFunc) Stop() { + f() +} + +type periodic struct { + interval time.Duration + callback func(Tick) + immediate bool + stop sync.Once + onStop func(Tick) +} diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go new file mode 100644 index 0000000..da73943 --- /dev/null +++ b/pkg/retry/retry.go @@ -0,0 +1,184 @@ +package retry + +import ( + "context" + "database/sql/driver" + "github.com/go-sql-driver/mysql" + "github.com/icinga/icingadb/pkg/backoff" + "github.com/lib/pq" + "github.com/pkg/errors" + "net" + "strings" + "syscall" + "time" +) + +// RetryableFunc is a retryable function. +type RetryableFunc func(context.Context) error + +// IsRetryable checks whether a new attempt can be started based on the error passed. +type IsRetryable func(error) bool + +// Settings aggregates optional settings for WithBackoff. +type Settings struct { + // Timeout lets WithBackoff give up once elapsed (if >0). + Timeout time.Duration + // OnError is called if an error occurs. + OnError func(elapsed time.Duration, attempt uint64, err, lastErr error) + // OnSuccess is called once the operation succeeds. + OnSuccess func(elapsed time.Duration, attempt uint64, lastErr error) +} + +// WithBackoff retries the passed function if it fails and the error allows it to retry. +// The specified backoff policy is used to determine how long to sleep between attempts. +func WithBackoff( + ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, settings Settings, +) (err error) { + parentCtx := ctx + + if settings.Timeout > 0 { + var cancelCtx context.CancelFunc + ctx, cancelCtx = context.WithTimeout(ctx, settings.Timeout) + defer cancelCtx() + } + + start := time.Now() + for attempt := uint64(0); ; /* true */ attempt++ { + prevErr := err + + if err = retryableFunc(ctx); err == nil { + if settings.OnSuccess != nil { + settings.OnSuccess(time.Since(start), attempt, prevErr) + } + + return + } + + if settings.OnError != nil { + settings.OnError(time.Since(start), attempt, err, prevErr) + } + + isRetryable := retryable(err) + + if prevErr != nil && (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { + err = prevErr + } + + if !isRetryable { + err = errors.Wrap(err, "can't retry") + + return + } + + sleep := b(attempt) + select { + case <-ctx.Done(): + if outerErr := parentCtx.Err(); outerErr != nil { + err = errors.Wrap(outerErr, "outer context canceled") + } else { + if err == nil { + err = ctx.Err() + } + err = errors.Wrap(err, "can't retry") + } + + return + case <-time.After(sleep): + } + } +} + +// Retryable returns true for common errors that are considered retryable, +// i.e. temporary, timeout, DNS, connection refused and reset, host down and unreachable and +// network down and unreachable errors. +func Retryable(err error) bool { + var temporary interface { + Temporary() bool + } + if errors.As(err, &temporary) && temporary.Temporary() { + return true + } + + var timeout interface { + Timeout() bool + } + if errors.As(err, &timeout) && timeout.Timeout() { + return true + } + + var dnsError *net.DNSError + if errors.As(err, &dnsError) { + return true + } + + var opError *net.OpError + if errors.As(err, &opError) { + // OpError provides Temporary() and Timeout(), but not Unwrap(), + // so we have to extract the underlying error ourselves to also check for ECONNREFUSED, + // which is not considered temporary or timed out by Go. + err = opError.Err + } + if errors.Is(err, syscall.ECONNREFUSED) || errors.Is(err, syscall.ENOENT) { + // syscall errors provide Temporary() and Timeout(), + // which do not include ECONNREFUSED or ENOENT, so we check these ourselves. + return true + } + if errors.Is(err, syscall.ECONNRESET) { + // ECONNRESET is treated as a temporary error by Go only if it comes from calling accept. + return true + } + if errors.Is(err, syscall.EHOSTDOWN) || errors.Is(err, syscall.EHOSTUNREACH) { + return true + } + if errors.Is(err, syscall.ENETDOWN) || errors.Is(err, syscall.ENETUNREACH) { + return true + } + + if errors.Is(err, driver.ErrBadConn) { + return true + } + if errors.Is(err, mysql.ErrInvalidConn) { + return true + } + + var e *mysql.MySQLError + if errors.As(err, &e) { + switch e.Number { + case 1053, 1205, 1213, 2006: + // 1053: Server shutdown in progress + // 1205: Lock wait timeout + // 1213: Deadlock found when trying to get lock + // 2006: MySQL server has gone away + return true + default: + return false + } + } + + var pe *pq.Error + if errors.As(err, &pe) { + switch pe.Code { + case "08000", // connection_exception + "08006", // connection_failure + "08001", // sqlclient_unable_to_establish_sqlconnection + "08004", // sqlserver_rejected_establishment_of_sqlconnection + "40001", // serialization_failure + "40P01", // deadlock_detected + "54000", // program_limit_exceeded + "55006", // object_in_use + "55P03", // lock_not_available + "57P01", // admin_shutdown + "57P02", // crash_shutdown + "57P03", // cannot_connect_now + "58000", // system_error + "58030", // io_error + "XX000": // internal_error + return true + default: + // Class 53 - Insufficient Resources + return strings.HasPrefix(string(pe.Code), "53") + } + } + + return false +} diff --git a/pkg/structify/structify.go b/pkg/structify/structify.go new file mode 100644 index 0000000..2b2b5bb --- /dev/null +++ b/pkg/structify/structify.go @@ -0,0 +1,179 @@ +package structify + +import ( + "encoding" + "fmt" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/pkg/errors" + "golang.org/x/exp/constraints" + "reflect" + "strconv" + "strings" + "unsafe" +) + +// structBranch represents either a leaf or a subTree. +type structBranch struct { + // field specifies the struct field index. + field int + // leaf specifies the map key to parse the struct field from. + leaf string + // subTree specifies the struct field's inner tree. + subTree []structBranch +} + +type MapStructifier = func(map[string]interface{}) (interface{}, error) + +// MakeMapStructifier builds a function which parses a map's string values into a new struct of type t +// and returns a pointer to it. tag specifies which tag connects struct fields to map keys. +// MakeMapStructifier panics if it detects an unsupported type (suitable for usage in init() or global vars). +func MakeMapStructifier(t reflect.Type, tag string) MapStructifier { + tree := buildStructTree(t, tag) + + return func(kv map[string]interface{}) (interface{}, error) { + vPtr := reflect.New(t) + ptr := vPtr.Interface() + + if initer, ok := ptr.(contracts.Initer); ok { + initer.Init() + } + + vPtrElem := vPtr.Elem() + err := errors.Wrapf(structifyMapByTree(kv, tree, vPtrElem, vPtrElem, new([]int)), "can't structify map %#v by tree %#v", kv, tree) + + return ptr, err + } +} + +// buildStructTree assembles a tree which represents the struct t based on tag. +func buildStructTree(t reflect.Type, tag string) []structBranch { + var tree []structBranch + numFields := t.NumField() + + for i := 0; i < numFields; i++ { + if field := t.Field(i); field.PkgPath == "" { + switch tagValue := field.Tag.Get(tag); tagValue { + case "", "-": + case ",inline": + if subTree := buildStructTree(field.Type, tag); subTree != nil { + tree = append(tree, structBranch{i, "", subTree}) + } + default: + // If parseString doesn't support *T, it'll panic. + _ = parseString("", reflect.New(field.Type).Interface()) + + tree = append(tree, structBranch{i, tagValue, nil}) + } + } + } + + return tree +} + +// structifyMapByTree parses src's string values into the struct dest according to tree's specification. +func structifyMapByTree(src map[string]interface{}, tree []structBranch, dest, root reflect.Value, stack *[]int) error { + *stack = append(*stack, 0) + defer func() { + *stack = (*stack)[:len(*stack)-1] + }() + + for _, branch := range tree { + (*stack)[len(*stack)-1] = branch.field + + if branch.subTree == nil { + if v, ok := src[branch.leaf]; ok { + if vs, ok := v.(string); ok { + if err := parseString(vs, dest.Field(branch.field).Addr().Interface()); err != nil { + rt := root.Type() + typ := rt + var path []string + + for _, i := range *stack { + f := typ.Field(i) + path = append(path, f.Name) + typ = f.Type + } + + return errors.Wrapf(err, "can't parse %s into the %s %s#%s: %s", + branch.leaf, typ.Name(), rt.Name(), strings.Join(path, "."), vs) + } + } + } + } else if err := structifyMapByTree(src, branch.subTree, dest.Field(branch.field), root, stack); err != nil { + return err + } + } + + return nil +} + +// parseString parses src into *dest. +func parseString(src string, dest interface{}) error { + switch ptr := dest.(type) { + case encoding.TextUnmarshaler: + return ptr.UnmarshalText([]byte(src)) + case *string: + *ptr = src + return nil + case **string: + *ptr = &src + return nil + case *uint8: + return parseUint(src, ptr) + case *uint16: + return parseUint(src, ptr) + case *uint32: + return parseUint(src, ptr) + case *uint64: + return parseUint(src, ptr) + case *int8: + return parseInt(src, ptr) + case *int16: + return parseInt(src, ptr) + case *int32: + return parseInt(src, ptr) + case *int64: + return parseInt(src, ptr) + case *float32: + return parseFloat(src, ptr) + case *float64: + return parseFloat(src, ptr) + default: + panic(fmt.Sprintf("unsupported type: %T", dest)) + } +} + +// parseUint parses src into *dest. +func parseUint[T constraints.Unsigned](src string, dest *T) error { + i, err := strconv.ParseUint(src, 10, bitSizeOf[T]()) + if err == nil { + *dest = T(i) + } + + return err +} + +// parseInt parses src into *dest. +func parseInt[T constraints.Signed](src string, dest *T) error { + i, err := strconv.ParseInt(src, 10, bitSizeOf[T]()) + if err == nil { + *dest = T(i) + } + + return err +} + +// parseFloat parses src into *dest. +func parseFloat[T constraints.Float](src string, dest *T) error { + f, err := strconv.ParseFloat(src, bitSizeOf[T]()) + if err == nil { + *dest = T(f) + } + + return err +} + +func bitSizeOf[T any]() int { + var x T + return int(unsafe.Sizeof(x) * 8) +} diff --git a/pkg/types/acknowledgement_state.go b/pkg/types/acknowledgement_state.go new file mode 100644 index 0000000..5bff613 --- /dev/null +++ b/pkg/types/acknowledgement_state.go @@ -0,0 +1,61 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "github.com/pkg/errors" +) + +// AcknowledgementState specifies an acknowledgement state (yes, no, sticky). +type AcknowledgementState uint8 + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (as *AcknowledgementState) UnmarshalText(text []byte) error { + return as.UnmarshalJSON(text) +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (as *AcknowledgementState) UnmarshalJSON(data []byte) error { + var i uint8 + if err := internal.UnmarshalJSON(data, &i); err != nil { + return err + } + + a := AcknowledgementState(i) + if _, ok := acknowledgementStates[a]; !ok { + return badAcknowledgementState(data) + } + + *as = a + return nil +} + +// Value implements the driver.Valuer interface. +func (as AcknowledgementState) Value() (driver.Value, error) { + if v, ok := acknowledgementStates[as]; ok { + return v, nil + } else { + return nil, badAcknowledgementState(as) + } +} + +// badAcknowledgementState returns an error about a syntactically, but not semantically valid AcknowledgementState. +func badAcknowledgementState(s interface{}) error { + return errors.Errorf("bad acknowledgement state: %#v", s) +} + +// acknowledgementStates maps all valid AcknowledgementState values to their SQL representation. +var acknowledgementStates = map[AcknowledgementState]string{ + 0: "n", + 1: "y", + 2: "sticky", +} + +// Assert interface compliance. +var ( + _ encoding.TextUnmarshaler = (*AcknowledgementState)(nil) + _ json.Unmarshaler = (*AcknowledgementState)(nil) + _ driver.Valuer = AcknowledgementState(0) +) diff --git a/pkg/types/binary.go b/pkg/types/binary.go new file mode 100644 index 0000000..9fb0a9f --- /dev/null +++ b/pkg/types/binary.go @@ -0,0 +1,137 @@ +package types + +import ( + "bytes" + "database/sql" + "database/sql/driver" + "encoding" + "encoding/hex" + "encoding/json" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/pkg/errors" +) + +// Binary nullable byte string. Hex as JSON. +type Binary []byte + +// nullBinary for validating whether a Binary is valid. +var nullBinary Binary + +// Equal returns whether the binaries are the same length and +// contain the same bytes. +func (binary Binary) Equal(equaler contracts.Equaler) bool { + b, ok := equaler.(Binary) + if !ok { + panic("bad Binary type assertion") + } + + return bytes.Equal(binary, b) +} + +// Valid returns whether the Binary is valid. +func (binary Binary) Valid() bool { + return !bytes.Equal(binary, nullBinary) +} + +// String returns the hex string representation form of the Binary. +func (binary Binary) String() string { + return hex.EncodeToString(binary) +} + +// MarshalText implements a custom marhsal function to encode +// the Binary as hex. MarshalText implements the +// encoding.TextMarshaler interface. +func (binary Binary) MarshalText() ([]byte, error) { + return []byte(binary.String()), nil +} + +// UnmarshalText implements a custom unmarshal function to decode +// hex into a Binary. UnmarshalText implements the +// encoding.TextUnmarshaler interface. +func (binary *Binary) UnmarshalText(text []byte) error { + b := make([]byte, hex.DecodedLen(len(text))) + _, err := hex.Decode(b, text) + if err != nil { + return internal.CantDecodeHex(err, string(text)) + } + *binary = b + + return nil +} + +// MarshalJSON implements a custom marshal function to encode the Binary +// as a hex string. MarshalJSON implements the json.Marshaler interface. +// Supports JSON null. +func (binary Binary) MarshalJSON() ([]byte, error) { + if !binary.Valid() { + return []byte("null"), nil + } + + return internal.MarshalJSON(binary.String()) +} + +// UnmarshalJSON implements a custom unmarshal function to decode +// a JSON hex string into a Binary. UnmarshalJSON implements the +// json.Unmarshaler interface. Supports JSON null. +func (binary *Binary) UnmarshalJSON(data []byte) error { + if string(data) == "null" || len(data) == 0 { + return nil + } + + var s string + if err := internal.UnmarshalJSON(data, &s); err != nil { + return err + } + b, err := hex.DecodeString(s) + if err != nil { + return internal.CantDecodeHex(err, s) + } + *binary = b + + return nil +} + +// Scan implements the sql.Scanner interface. +// Supports SQL NULL. +func (binary *Binary) Scan(src interface{}) error { + switch src := src.(type) { + case nil: + return nil + + case []byte: + if len(src) == 0 { + return nil + } + + b := make([]byte, len(src)) + copy(b, src) + *binary = b + + default: + return errors.Errorf("unable to scan type %T into Binary", src) + } + + return nil +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (binary Binary) Value() (driver.Value, error) { + if !binary.Valid() { + return nil, nil + } + + return []byte(binary), nil +} + +// Assert interface compliance. +var ( + _ contracts.ID = (*Binary)(nil) + _ encoding.TextMarshaler = (*Binary)(nil) + _ encoding.TextUnmarshaler = (*Binary)(nil) + _ json.Marshaler = (*Binary)(nil) + _ json.Unmarshaler = (*Binary)(nil) + _ sql.Scanner = (*Binary)(nil) + _ driver.Valuer = (*Binary)(nil) +) diff --git a/pkg/types/binary_test.go b/pkg/types/binary_test.go new file mode 100644 index 0000000..2a4f829 --- /dev/null +++ b/pkg/types/binary_test.go @@ -0,0 +1,29 @@ +package types + +import ( + "github.com/stretchr/testify/require" + "testing" + "unicode/utf8" +) + +func TestBinary_MarshalJSON(t *testing.T) { + subtests := []struct { + name string + input Binary + output string + }{ + {"nil", nil, `null`}, + {"empty", make(Binary, 0, 1), `null`}, + {"space", Binary(" "), `"20"`}, + } + + for _, st := range subtests { + t.Run(st.name, func(t *testing.T) { + actual, err := st.input.MarshalJSON() + + require.NoError(t, err) + require.True(t, utf8.Valid(actual)) + require.Equal(t, st.output, string(actual)) + }) + } +} diff --git a/pkg/types/bool.go b/pkg/types/bool.go new file mode 100644 index 0000000..2b96c09 --- /dev/null +++ b/pkg/types/bool.go @@ -0,0 +1,105 @@ +package types + +import ( + "database/sql" + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "github.com/pkg/errors" + "strconv" +) + +var ( + enum = map[bool]string{ + true: "y", + false: "n", + } +) + +// Bool represents a bool for ENUM ('y', 'n'), which can be NULL. +type Bool struct { + Bool bool + Valid bool // Valid is true if Bool is not NULL +} + +// MarshalJSON implements the json.Marshaler interface. +func (b Bool) MarshalJSON() ([]byte, error) { + if !b.Valid { + return []byte("null"), nil + } + + return internal.MarshalJSON(b.Bool) +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (b *Bool) UnmarshalText(text []byte) error { + parsed, err := strconv.ParseUint(string(text), 10, 64) + if err != nil { + return internal.CantParseUint64(err, string(text)) + } + + *b = Bool{parsed != 0, true} + return nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (b *Bool) UnmarshalJSON(data []byte) error { + if string(data) == "null" || len(data) == 0 { + return nil + } + + if err := internal.UnmarshalJSON(data, &b.Bool); err != nil { + return err + } + + b.Valid = true + + return nil +} + +// Scan implements the sql.Scanner interface. +// Supports SQL NULL. +func (b *Bool) Scan(src interface{}) error { + if src == nil { + b.Bool, b.Valid = false, false + return nil + } + + v, ok := src.([]byte) + if !ok { + return errors.Errorf("bad []byte type assertion from %#v", src) + } + + switch string(v) { + case "y": + b.Bool = true + case "n": + b.Bool = false + default: + return errors.Errorf("bad bool %#v", v) + } + + b.Valid = true + + return nil +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (b Bool) Value() (driver.Value, error) { + if !b.Valid { + return nil, nil + } + + return enum[b.Bool], nil +} + +// Assert interface compliance. +var ( + _ json.Marshaler = (*Bool)(nil) + _ encoding.TextUnmarshaler = (*Bool)(nil) + _ json.Unmarshaler = (*Bool)(nil) + _ sql.Scanner = (*Bool)(nil) + _ driver.Valuer = (*Bool)(nil) +) diff --git a/pkg/types/bool_test.go b/pkg/types/bool_test.go new file mode 100644 index 0000000..fe49588 --- /dev/null +++ b/pkg/types/bool_test.go @@ -0,0 +1,30 @@ +package types + +import ( + "fmt" + "github.com/stretchr/testify/require" + "testing" + "unicode/utf8" +) + +func TestBool_MarshalJSON(t *testing.T) { + subtests := []struct { + input Bool + output string + }{ + {Bool{Bool: false, Valid: false}, `null`}, + {Bool{Bool: false, Valid: true}, `false`}, + {Bool{Bool: true, Valid: false}, `null`}, + {Bool{Bool: true, Valid: true}, `true`}, + } + + for _, st := range subtests { + t.Run(fmt.Sprintf("Bool-%#v_Valid-%#v", st.input.Bool, st.input.Valid), func(t *testing.T) { + actual, err := st.input.MarshalJSON() + + require.NoError(t, err) + require.True(t, utf8.Valid(actual)) + require.Equal(t, st.output, string(actual)) + }) + } +} diff --git a/pkg/types/comment_type.go b/pkg/types/comment_type.go new file mode 100644 index 0000000..8aed475 --- /dev/null +++ b/pkg/types/comment_type.go @@ -0,0 +1,79 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "github.com/pkg/errors" + "strconv" +) + +// CommentType specifies a comment's origin's kind. +type CommentType uint8 + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (ct *CommentType) UnmarshalJSON(data []byte) error { + var i uint8 + if err := internal.UnmarshalJSON(data, &i); err != nil { + return err + } + + c := CommentType(i) + if _, ok := commentTypes[c]; !ok { + return badCommentType(data) + } + + *ct = c + return nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (ct *CommentType) UnmarshalText(text []byte) error { + s := string(text) + + i, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return internal.CantParseUint64(err, s) + } + + c := CommentType(i) + if uint64(c) != i { + // Truncated due to above cast, obviously too high + return badCommentType(s) + } + + if _, ok := commentTypes[c]; !ok { + return badCommentType(s) + } + + *ct = c + return nil +} + +// Value implements the driver.Valuer interface. +func (ct CommentType) Value() (driver.Value, error) { + if v, ok := commentTypes[ct]; ok { + return v, nil + } else { + return nil, badCommentType(ct) + } +} + +// badCommentType returns an error about a syntactically, but not semantically valid CommentType. +func badCommentType(t interface{}) error { + return errors.Errorf("bad comment type: %#v", t) +} + +// commentTypes maps all valid CommentType values to their SQL representation. +var commentTypes = map[CommentType]string{ + 1: "comment", + 4: "ack", +} + +// Assert interface compliance. +var ( + _ json.Unmarshaler = (*CommentType)(nil) + _ encoding.TextUnmarshaler = (*CommentType)(nil) + _ driver.Valuer = CommentType(0) +) diff --git a/pkg/types/float.go b/pkg/types/float.go new file mode 100644 index 0000000..a4aedd6 --- /dev/null +++ b/pkg/types/float.go @@ -0,0 +1,68 @@ +package types + +import ( + "bytes" + "database/sql" + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "strconv" +) + +// Float adds JSON support to sql.NullFloat64. +type Float struct { + sql.NullFloat64 +} + +// MarshalJSON implements the json.Marshaler interface. +// Supports JSON null. +func (f Float) MarshalJSON() ([]byte, error) { + var v interface{} + if f.Valid { + v = f.Float64 + } + + return internal.MarshalJSON(v) +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (f *Float) UnmarshalText(text []byte) error { + parsed, err := strconv.ParseFloat(string(text), 64) + if err != nil { + return internal.CantParseFloat64(err, string(text)) + } + + *f = Float{sql.NullFloat64{ + Float64: parsed, + Valid: true, + }} + + return nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +// Supports JSON null. +func (f *Float) UnmarshalJSON(data []byte) error { + // Ignore null, like in the main JSON package. + if bytes.HasPrefix(data, []byte{'n'}) { + return nil + } + + if err := internal.UnmarshalJSON(data, &f.Float64); err != nil { + return err + } + + f.Valid = true + + return nil +} + +// Assert interface compliance. +var ( + _ json.Marshaler = Float{} + _ encoding.TextUnmarshaler = (*Float)(nil) + _ json.Unmarshaler = (*Float)(nil) + _ driver.Valuer = Float{} + _ sql.Scanner = (*Float)(nil) +) diff --git a/pkg/types/int.go b/pkg/types/int.go new file mode 100644 index 0000000..0e51f21 --- /dev/null +++ b/pkg/types/int.go @@ -0,0 +1,68 @@ +package types + +import ( + "bytes" + "database/sql" + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "strconv" +) + +// Int adds JSON support to sql.NullInt64. +type Int struct { + sql.NullInt64 +} + +// MarshalJSON implements the json.Marshaler interface. +// Supports JSON null. +func (i Int) MarshalJSON() ([]byte, error) { + var v interface{} + if i.Valid { + v = i.Int64 + } + + return internal.MarshalJSON(v) +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (i *Int) UnmarshalText(text []byte) error { + parsed, err := strconv.ParseInt(string(text), 10, 64) + if err != nil { + return internal.CantParseInt64(err, string(text)) + } + + *i = Int{sql.NullInt64{ + Int64: parsed, + Valid: true, + }} + + return nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +// Supports JSON null. +func (i *Int) UnmarshalJSON(data []byte) error { + // Ignore null, like in the main JSON package. + if bytes.HasPrefix(data, []byte{'n'}) { + return nil + } + + if err := internal.UnmarshalJSON(data, &i.Int64); err != nil { + return err + } + + i.Valid = true + + return nil +} + +// Assert interface compliance. +var ( + _ json.Marshaler = Int{} + _ json.Unmarshaler = (*Int)(nil) + _ encoding.TextUnmarshaler = (*Int)(nil) + _ driver.Valuer = Int{} + _ sql.Scanner = (*Int)(nil) +) diff --git a/pkg/types/notification_states.go b/pkg/types/notification_states.go new file mode 100644 index 0000000..ff5760a --- /dev/null +++ b/pkg/types/notification_states.go @@ -0,0 +1,78 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "github.com/pkg/errors" +) + +// NotificationStates specifies the set of states a notification may be sent for. +type NotificationStates uint8 + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (nst *NotificationStates) UnmarshalJSON(data []byte) error { + var states []string + if err := internal.UnmarshalJSON(data, &states); err != nil { + return err + } + + var n NotificationStates + for _, state := range states { + if v, ok := notificationStateNames[state]; ok { + n |= v + } else { + return badNotificationStates(states) + } + } + + *nst = n + return nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (nst *NotificationStates) UnmarshalText(text []byte) error { + return nst.UnmarshalJSON(text) +} + +// Value implements the driver.Valuer interface. +func (nst NotificationStates) Value() (driver.Value, error) { + if nst&^allNotificationStates == 0 { + return int64(nst), nil + } else { + return nil, badNotificationStates(nst) + } +} + +// badNotificationStates returns an error about syntactically, but not semantically valid NotificationStates. +func badNotificationStates(s interface{}) error { + return errors.Errorf("bad notification states: %#v", s) +} + +// notificationStateNames maps all valid NotificationStates values to their SQL representation. +var notificationStateNames = map[string]NotificationStates{ + "OK": 1, + "Warning": 2, + "Critical": 4, + "Unknown": 8, + "Up": 16, + "Down": 32, +} + +// allNotificationStates is the largest valid NotificationStates value. +var allNotificationStates = func() NotificationStates { + var nt NotificationStates + for _, v := range notificationStateNames { + nt |= v + } + + return nt +}() + +// Assert interface compliance. +var ( + _ json.Unmarshaler = (*NotificationStates)(nil) + _ encoding.TextUnmarshaler = (*NotificationStates)(nil) + _ driver.Valuer = NotificationStates(0) +) diff --git a/pkg/types/notification_type.go b/pkg/types/notification_type.go new file mode 100644 index 0000000..f2980f4 --- /dev/null +++ b/pkg/types/notification_type.go @@ -0,0 +1,68 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "github.com/icinga/icingadb/internal" + "github.com/pkg/errors" + "strconv" +) + +// NotificationType specifies the reason of a sent notification. +type NotificationType uint16 + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (nt *NotificationType) UnmarshalText(text []byte) error { + s := string(text) + + i, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return internal.CantParseUint64(err, s) + } + + n := NotificationType(i) + if uint64(n) != i { + // Truncated due to above cast, obviously too high + return badNotificationType(s) + } + + if _, ok := notificationTypes[n]; !ok { + return badNotificationType(s) + } + + *nt = n + return nil +} + +// Value implements the driver.Valuer interface. +func (nt NotificationType) Value() (driver.Value, error) { + if v, ok := notificationTypes[nt]; ok { + return v, nil + } else { + return nil, badNotificationType(nt) + } +} + +// badNotificationType returns an error about a syntactically, but not semantically valid NotificationType. +func badNotificationType(t interface{}) error { + return errors.Errorf("bad notification type: %#v", t) +} + +// notificationTypes maps all valid NotificationType values to their SQL representation. +var notificationTypes = map[NotificationType]string{ + 1: "downtime_start", + 2: "downtime_end", + 4: "downtime_removed", + 8: "custom", + 16: "acknowledgement", + 32: "problem", + 64: "recovery", + 128: "flapping_start", + 256: "flapping_end", +} + +// Assert interface compliance. +var ( + _ encoding.TextUnmarshaler = (*NotificationType)(nil) + _ driver.Valuer = NotificationType(0) +) diff --git a/pkg/types/notification_types.go b/pkg/types/notification_types.go new file mode 100644 index 0000000..832a515 --- /dev/null +++ b/pkg/types/notification_types.go @@ -0,0 +1,81 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "github.com/pkg/errors" +) + +// NotificationTypes specifies the set of reasons a notification may be sent for. +type NotificationTypes uint16 + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (nt *NotificationTypes) UnmarshalJSON(data []byte) error { + var types []string + if err := internal.UnmarshalJSON(data, &types); err != nil { + return err + } + + var n NotificationTypes + for _, typ := range types { + if v, ok := notificationTypeNames[typ]; ok { + n |= v + } else { + return badNotificationTypes(nt) + } + } + + *nt = n + return nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (nt *NotificationTypes) UnmarshalText(text []byte) error { + return nt.UnmarshalJSON(text) +} + +// Value implements the driver.Valuer interface. +func (nt NotificationTypes) Value() (driver.Value, error) { + if nt&^allNotificationTypes == 0 { + return int64(nt), nil + } else { + return nil, badNotificationTypes(nt) + } +} + +// badNotificationTypes returns an error about syntactically, but not semantically valid NotificationTypes. +func badNotificationTypes(t interface{}) error { + return errors.Errorf("bad notification types: %#v", t) +} + +// notificationTypeNames maps all valid NotificationTypes values to their SQL representation. +var notificationTypeNames = map[string]NotificationTypes{ + "DowntimeStart": 1, + "DowntimeEnd": 2, + "DowntimeRemoved": 4, + "Custom": 8, + "Acknowledgement": 16, + "Problem": 32, + "Recovery": 64, + "FlappingStart": 128, + "FlappingEnd": 256, +} + +// allNotificationTypes is the largest valid NotificationTypes value. +var allNotificationTypes = func() NotificationTypes { + var nt NotificationTypes + for _, v := range notificationTypeNames { + nt |= v + } + + return nt +}() + +// Assert interface compliance. +var ( + _ json.Unmarshaler = (*NotificationTypes)(nil) + _ encoding.TextUnmarshaler = (*NotificationTypes)(nil) + _ driver.Valuer = NotificationTypes(0) +) diff --git a/pkg/types/state_type.go b/pkg/types/state_type.go new file mode 100644 index 0000000..f0cc69a --- /dev/null +++ b/pkg/types/state_type.go @@ -0,0 +1,65 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "github.com/pkg/errors" +) + +// StateType specifies a state's hardness. +type StateType uint8 + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (st *StateType) UnmarshalText(text []byte) error { + return st.UnmarshalJSON(text) +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (st *StateType) UnmarshalJSON(data []byte) error { + var i uint8 + if err := internal.UnmarshalJSON(data, &i); err != nil { + return err + } + + s := StateType(i) + if _, ok := stateTypes[s]; !ok { + return badStateType(data) + } + + *st = s + return nil +} + +// Value implements the driver.Valuer interface. +func (st StateType) Value() (driver.Value, error) { + if v, ok := stateTypes[st]; ok { + return v, nil + } else { + return nil, badStateType(st) + } +} + +// badStateType returns and error about a syntactically, but not semantically valid StateType. +func badStateType(t interface{}) error { + return errors.Errorf("bad state type: %#v", t) +} + +const ( + StateSoft = StateType(0) + StateHard = StateType(1) +) + +// stateTypes maps all valid StateType values to their SQL representation. +var stateTypes = map[StateType]string{ + StateSoft: "soft", + StateHard: "hard", +} + +// Assert interface compliance. +var ( + _ encoding.TextUnmarshaler = (*StateType)(nil) + _ json.Unmarshaler = (*StateType)(nil) + _ driver.Valuer = StateType(0) +) diff --git a/pkg/types/string.go b/pkg/types/string.go new file mode 100644 index 0000000..f8ead45 --- /dev/null +++ b/pkg/types/string.go @@ -0,0 +1,74 @@ +package types + +import ( + "bytes" + "database/sql" + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "strings" +) + +// String adds JSON support to sql.NullString. +type String struct { + sql.NullString +} + +// MarshalJSON implements the json.Marshaler interface. +// Supports JSON null. +func (s String) MarshalJSON() ([]byte, error) { + var v interface{} + if s.Valid { + v = s.String + } + + return internal.MarshalJSON(v) +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (s *String) UnmarshalText(text []byte) error { + *s = String{sql.NullString{ + String: string(text), + Valid: true, + }} + + return nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +// Supports JSON null. +func (s *String) UnmarshalJSON(data []byte) error { + // Ignore null, like in the main JSON package. + if bytes.HasPrefix(data, []byte{'n'}) { + return nil + } + + if err := internal.UnmarshalJSON(data, &s.String); err != nil { + return err + } + + s.Valid = true + + return nil +} + +// Value implements the driver.Valuer interface. +// Supports SQL NULL. +func (s String) Value() (driver.Value, error) { + if !s.Valid { + return nil, nil + } + + // PostgreSQL does not allow null bytes in varchar, char and text fields. + return strings.ReplaceAll(s.String, "\x00", ""), nil +} + +// Assert interface compliance. +var ( + _ json.Marshaler = String{} + _ encoding.TextUnmarshaler = (*String)(nil) + _ json.Unmarshaler = (*String)(nil) + _ driver.Valuer = String{} + _ sql.Scanner = (*String)(nil) +) diff --git a/pkg/types/unix_milli.go b/pkg/types/unix_milli.go new file mode 100644 index 0000000..3f6f988 --- /dev/null +++ b/pkg/types/unix_milli.go @@ -0,0 +1,95 @@ +package types + +import ( + "database/sql" + "database/sql/driver" + "encoding" + "encoding/json" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "strconv" + "time" +) + +// UnixMilli is a nullable millisecond UNIX timestamp in databases and JSON. +type UnixMilli time.Time + +// Time returns the time.Time conversion of UnixMilli. +func (t UnixMilli) Time() time.Time { + return time.Time(t) +} + +// MarshalJSON implements the json.Marshaler interface. +// Marshals to milliseconds. Supports JSON null. +func (t UnixMilli) MarshalJSON() ([]byte, error) { + if time.Time(t).IsZero() { + return []byte("null"), nil + } + + return []byte(strconv.FormatInt(time.Time(t).UnixMilli(), 10)), nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (t *UnixMilli) UnmarshalText(text []byte) error { + parsed, err := strconv.ParseFloat(string(text), 64) + if err != nil { + return internal.CantParseFloat64(err, string(text)) + } + + *t = UnixMilli(utils.FromUnixMilli(int64(parsed))) + return nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +// Unmarshals from milliseconds. Supports JSON null. +func (t *UnixMilli) UnmarshalJSON(data []byte) error { + if string(data) == "null" || len(data) == 0 { + return nil + } + + ms, err := strconv.ParseFloat(string(data), 64) + if err != nil { + return internal.CantParseFloat64(err, string(data)) + } + tt := utils.FromUnixMilli(int64(ms)) + *t = UnixMilli(tt) + + return nil +} + +// Scan implements the sql.Scanner interface. +// Scans from milliseconds. Supports SQL NULL. +func (t *UnixMilli) Scan(src interface{}) error { + if src == nil { + return nil + } + + v, ok := src.(int64) + if !ok { + return errors.Errorf("bad int64 type assertion from %#v", src) + } + tt := utils.FromUnixMilli(v) + *t = UnixMilli(tt) + + return nil +} + +// Value implements the driver.Valuer interface. +// Returns milliseconds. Supports SQL NULL. +func (t UnixMilli) Value() (driver.Value, error) { + if t.Time().IsZero() { + return nil, nil + } + + return t.Time().UnixMilli(), nil +} + +// Assert interface compliance. +var ( + _ json.Marshaler = (*UnixMilli)(nil) + _ encoding.TextUnmarshaler = (*UnixMilli)(nil) + _ json.Unmarshaler = (*UnixMilli)(nil) + _ sql.Scanner = (*UnixMilli)(nil) + _ driver.Valuer = (*UnixMilli)(nil) +) diff --git a/pkg/types/unix_milli_test.go b/pkg/types/unix_milli_test.go new file mode 100644 index 0000000..985fa2a --- /dev/null +++ b/pkg/types/unix_milli_test.go @@ -0,0 +1,30 @@ +package types + +import ( + "github.com/stretchr/testify/require" + "testing" + "time" + "unicode/utf8" +) + +func TestUnixMilli_MarshalJSON(t *testing.T) { + subtests := []struct { + name string + input UnixMilli + output string + }{ + {"zero", UnixMilli{}, `null`}, + {"epoch", UnixMilli(time.Unix(0, 0)), `0`}, + {"nonzero", UnixMilli(time.Unix(1234567890, 62500000)), `1234567890062`}, + } + + for _, st := range subtests { + t.Run(st.name, func(t *testing.T) { + actual, err := st.input.MarshalJSON() + + require.NoError(t, err) + require.True(t, utf8.Valid(actual)) + require.Equal(t, st.output, string(actual)) + }) + } +} diff --git a/pkg/types/uuid.go b/pkg/types/uuid.go new file mode 100644 index 0000000..02acbcd --- /dev/null +++ b/pkg/types/uuid.go @@ -0,0 +1,24 @@ +package types + +import ( + "database/sql/driver" + "encoding" + "github.com/google/uuid" +) + +// UUID is like uuid.UUID, but marshals itself binarily (not like xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) in SQL context. +type UUID struct { + uuid.UUID +} + +// Value implements driver.Valuer. +func (uuid UUID) Value() (driver.Value, error) { + return uuid.UUID[:], nil +} + +// Assert interface compliance. +var ( + _ encoding.TextUnmarshaler = (*UUID)(nil) + _ driver.Valuer = UUID{} + _ driver.Valuer = (*UUID)(nil) +) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go new file mode 100644 index 0000000..4b0fe1d --- /dev/null +++ b/pkg/utils/utils.go @@ -0,0 +1,231 @@ +package utils + +import ( + "context" + "crypto/sha1" + "fmt" + "github.com/go-sql-driver/mysql" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/lib/pq" + "github.com/pkg/errors" + "golang.org/x/exp/utf8string" + "math" + "net" + "os" + "path/filepath" + "strings" + "time" + "unicode" +) + +// FromUnixMilli creates and returns a time.Time value +// from the given milliseconds since the Unix epoch ms. +func FromUnixMilli(ms int64) time.Time { + sec, dec := math.Modf(float64(ms) / 1e3) + + return time.Unix(int64(sec), int64(dec*(1e9))) +} + +// Name returns the declared name of type t. +// Name is used in combination with Key +// to automatically guess an entity's +// database table and Redis key. +func Name(t interface{}) string { + s := strings.TrimLeft(fmt.Sprintf("%T", t), "*") + + return s[strings.LastIndex(s, ".")+1:] +} + +// TableName returns the table of t. +func TableName(t interface{}) string { + if tn, ok := t.(contracts.TableNamer); ok { + return tn.TableName() + } else { + return Key(Name(t), '_') + } +} + +// Key returns the name with all Unicode letters mapped to lower case letters, +// with an additional separator in front of each original upper case letter. +func Key(name string, sep byte) string { + return ConvertCamelCase(name, unicode.LowerCase, sep) +} + +// Timed calls the given callback with the time that has elapsed since the start. +// +// Timed should be installed by defer: +// +// func TimedExample(logger *zap.SugaredLogger) { +// defer utils.Timed(time.Now(), func(elapsed time.Duration) { +// logger.Debugf("Executed job in %s", elapsed) +// }) +// job() +// } +func Timed(start time.Time, callback func(elapsed time.Duration)) { + callback(time.Since(start)) +} + +// BatchSliceOfStrings groups the given keys into chunks of size count and streams them into a returned channel. +func BatchSliceOfStrings(ctx context.Context, keys []string, count int) <-chan []string { + batches := make(chan []string) + + go func() { + defer close(batches) + + for i := 0; i < len(keys); i += count { + end := i + count + if end > len(keys) { + end = len(keys) + } + + select { + case batches <- keys[i:end]: + case <-ctx.Done(): + return + } + } + }() + + return batches +} + +// IsContextCanceled returns whether the given error is context.Canceled. +func IsContextCanceled(err error) bool { + return errors.Is(err, context.Canceled) +} + +// Checksum returns the SHA-1 checksum of the data. +func Checksum(data interface{}) []byte { + var chksm [sha1.Size]byte + + switch data := data.(type) { + case string: + chksm = sha1.Sum([]byte(data)) + case []byte: + chksm = sha1.Sum(data) + default: + panic(fmt.Sprintf("Unable to create checksum for type %T", data)) + } + + return chksm[:] +} + +// Fatal panics with the given error. +func Fatal(err error) { + panic(err) +} + +// IsDeadlock returns whether the given error signals serialization failure. +func IsDeadlock(err error) bool { + var e *mysql.MySQLError + if errors.As(err, &e) { + switch e.Number { + case 1205, 1213: + return true + default: + return false + } + } + + var pe *pq.Error + if errors.As(err, &pe) { + switch pe.Code { + case "40001", "40P01": + return true + } + } + + return false +} + +var ellipsis = utf8string.NewString("...") + +// Ellipsize shortens s to <=limit runes and indicates shortening by "...". +func Ellipsize(s string, limit int) string { + utf8 := utf8string.NewString(s) + switch { + case utf8.RuneCount() <= limit: + return s + case utf8.RuneCount() <= ellipsis.RuneCount(): + return ellipsis.String() + default: + return utf8.Slice(0, limit-ellipsis.RuneCount()) + ellipsis.String() + } +} + +// ConvertCamelCase converts a (lower) CamelCase string into various cases. +// _case must be unicode.Lower or unicode.Upper. +// +// Example usage: +// +// # snake_case +// ConvertCamelCase(s, unicode.Lower, '_') +// +// # SCREAMING_SNAKE_CASE +// ConvertCamelCase(s, unicode.Upper, '_') +// +// # kebab-case +// ConvertCamelCase(s, unicode.Lower, '-') +// +// # SCREAMING-KEBAB-CASE +// ConvertCamelCase(s, unicode.Upper, '-') +// +// # other.separator +// ConvertCamelCase(s, unicode.Lower, '.') +func ConvertCamelCase(s string, _case int, sep byte) string { + r := []rune(s) + b := strings.Builder{} + b.Grow(len(r) + 2) // nominal 2 bytes of extra space for inserted delimiters + + b.WriteRune(unicode.To(_case, r[0])) + for _, r := range r[1:] { + if sep != 0 && unicode.IsUpper(r) { + b.WriteByte(sep) + } + + b.WriteRune(unicode.To(_case, r)) + } + + return b.String() +} + +// AppName returns the name of the executable that started this program (process). +func AppName() string { + exe, err := os.Executable() + if err != nil { + exe = os.Args[0] + } + + return filepath.Base(exe) +} + +// MaxInt returns the larger of the given integers. +func MaxInt(x, y int) int { + if x > y { + return x + } + + return y +} + +// JoinHostPort is like its equivalent in net., but handles UNIX sockets as well. +func JoinHostPort(host string, port int) string { + if strings.HasPrefix(host, "/") { + return host + } + + return net.JoinHostPort(host, fmt.Sprint(port)) +} + +// ChanFromSlice takes a slice of values and returns a channel from which these values can be received. +// This channel is closed after the last value was sent. +func ChanFromSlice[T any](values []T) <-chan T { + ch := make(chan T, len(values)) + for _, value := range values { + ch <- value + } + + close(ch) + + return ch +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go new file mode 100644 index 0000000..b0ea54b --- /dev/null +++ b/pkg/utils/utils_test.go @@ -0,0 +1,54 @@ +package utils + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestChanFromSlice(t *testing.T) { + t.Run("Nil", func(t *testing.T) { + ch := ChanFromSlice[int](nil) + require.NotNil(t, ch) + requireClosedEmpty(t, ch) + }) + + t.Run("Empty", func(t *testing.T) { + ch := ChanFromSlice([]int{}) + require.NotNil(t, ch) + requireClosedEmpty(t, ch) + }) + + t.Run("NonEmpty", func(t *testing.T) { + ch := ChanFromSlice([]int{42, 23, 1337}) + require.NotNil(t, ch) + requireReceive(t, ch, 42) + requireReceive(t, ch, 23) + requireReceive(t, ch, 1337) + requireClosedEmpty(t, ch) + }) +} + +// requireReceive is a helper function to check if a value can immediately be received from a channel. +func requireReceive(t *testing.T, ch <-chan int, expected int) { + t.Helper() + + select { + case v, ok := <-ch: + require.True(t, ok, "receiving should return a value") + require.Equal(t, expected, v) + default: + require.Fail(t, "receiving should not block") + } +} + +// requireReceive is a helper function to check if the channel is closed and empty. +func requireClosedEmpty(t *testing.T, ch <-chan int) { + t.Helper() + + select { + case _, ok := <-ch: + require.False(t, ok, "receiving from channel should not return anything") + default: + require.Fail(t, "receiving should not block") + } +} diff --git a/pkg/version/version.go b/pkg/version/version.go new file mode 100644 index 0000000..250318c --- /dev/null +++ b/pkg/version/version.go @@ -0,0 +1,180 @@ +package version + +import ( + "bufio" + "errors" + "fmt" + "os" + "runtime" + "runtime/debug" + "strconv" + "strings" +) + +type VersionInfo struct { + Version string + Commit string +} + +// Version determines version and commit information based on multiple data sources: +// - Version information dynamically added by `git archive` in the remaining to parameters. +// - A hardcoded version number passed as first parameter. +// - Commit information added to the binary by `go build`. +// +// It's supposed to be called like this in combination with setting the `export-subst` attribute for the corresponding +// file in .gitattributes: +// +// var Version = version.Version("1.0.0-rc2", "$Format:%(describe)$", "$Format:%H$") +// +// When exported using `git archive`, the placeholders are replaced in the file and this version information is +// preferred. Otherwise the hardcoded version is used and augmented with commit information from the build metadata. +func Version(version, gitDescribe, gitHash string) *VersionInfo { + const hashLen = 7 // Same truncation length for the commit hash as used by git describe. + + if !strings.HasPrefix(gitDescribe, "$") && !strings.HasPrefix(gitHash, "$") { + if strings.HasPrefix(gitDescribe, "%") { + // Only Git 2.32+ supports %(describe), older versions don't expand it but keep it as-is. + // Fall back to the hardcoded version augmented with the commit hash. + gitDescribe = version + + if len(gitHash) >= hashLen { + gitDescribe += "-g" + gitHash[:hashLen] + } + } + + return &VersionInfo{ + Version: gitDescribe, + Commit: gitHash, + } + } else { + commit := "" + + if info, ok := debug.ReadBuildInfo(); ok { + modified := false + + for _, setting := range info.Settings { + switch setting.Key { + case "vcs.revision": + commit = setting.Value + case "vcs.modified": + modified, _ = strconv.ParseBool(setting.Value) + } + } + + if len(commit) >= hashLen { + version += "-g" + commit[:hashLen] + + if modified { + version += "-dirty" + commit += " (modified)" + } + } + } + + return &VersionInfo{ + Version: version, + Commit: commit, + } + } +} + +// Print writes verbose version output to stdout. +func (v *VersionInfo) Print() { + fmt.Println("Icinga DB version:", v.Version) + fmt.Println() + + fmt.Println("Build information:") + fmt.Printf(" Go version: %s (%s, %s)\n", runtime.Version(), runtime.GOOS, runtime.GOARCH) + if v.Commit != "" { + fmt.Println(" Git commit:", v.Commit) + } + + if r, err := readOsRelease(); err == nil { + fmt.Println() + fmt.Println("System information:") + fmt.Println(" Platform:", r.Name) + fmt.Println(" Platform version:", r.DisplayVersion()) + } +} + +// osRelease contains the information obtained from the os-release file. +type osRelease struct { + Name string + Version string + VersionId string + BuildId string +} + +// DisplayVersion returns the most suitable version information for display purposes. +func (o *osRelease) DisplayVersion() string { + if o.Version != "" { + // Most distributions set VERSION + return o.Version + } else if o.VersionId != "" { + // Some only set VERSION_ID (Alpine Linux for example) + return o.VersionId + } else if o.BuildId != "" { + // Others only set BUILD_ID (Arch Linux for example) + return o.BuildId + } else { + return "(unknown)" + } +} + +// readOsRelease reads and parses the os-release file. +func readOsRelease() (*osRelease, error) { + for _, path := range []string{"/etc/os-release", "/usr/lib/os-release"} { + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + continue // Try next path. + } else { + return nil, err + } + } + + o := &osRelease{ + Name: "Linux", // Suggested default as per os-release(5) man page. + } + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "#") { + continue // Ignore comment. + } + + parts := strings.SplitN(line, "=", 2) + if len(parts) != 2 { + continue // Ignore empty or possibly malformed line. + } + + key := parts[0] + val := parts[1] + + // Unquote strings. This isn't fully compliant with the specification which allows using some shell escape + // sequences. However, typically quotes are only used to allow whitespace within the value. + if len(val) >= 2 && (val[0] == '"' || val[0] == '\'') && val[0] == val[len(val)-1] { + val = val[1 : len(val)-1] + } + + switch key { + case "NAME": + o.Name = val + case "VERSION": + o.Version = val + case "VERSION_ID": + o.VersionId = val + case "BUILD_ID": + o.BuildId = val + } + } + if err := scanner.Err(); err != nil { + return nil, err + } + + return o, nil + } + + return nil, errors.New("os-release file not found") +} |