summaryrefslogtreecommitdiffstats
path: root/pkg/icingadb/dump_signals.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/icingadb/dump_signals.go')
-rw-r--r--pkg/icingadb/dump_signals.go136
1 files changed, 136 insertions, 0 deletions
diff --git a/pkg/icingadb/dump_signals.go b/pkg/icingadb/dump_signals.go
new file mode 100644
index 0000000..bce1aef
--- /dev/null
+++ b/pkg/icingadb/dump_signals.go
@@ -0,0 +1,136 @@
+package icingadb
+
+import (
+ "context"
+ "github.com/go-redis/redis/v8"
+ "github.com/icinga/icingadb/pkg/icingaredis"
+ "github.com/icinga/icingadb/pkg/logging"
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+ "sync"
+)
+
+// DumpSignals reads dump signals from a Redis stream via Listen.
+// Dump-done signals are passed on via Done channels, while InProgress must be checked for dump-wip signals.
+type DumpSignals struct {
+ redis *icingaredis.Client
+ logger *logging.Logger
+ mutex sync.Mutex
+ doneCh map[string]chan struct{}
+ allDoneCh chan struct{}
+ inProgressCh chan struct{}
+}
+
+// NewDumpSignals returns new DumpSignals.
+func NewDumpSignals(redis *icingaredis.Client, logger *logging.Logger) *DumpSignals {
+ return &DumpSignals{
+ redis: redis,
+ logger: logger,
+ doneCh: make(map[string]chan struct{}),
+ inProgressCh: make(chan struct{}),
+ }
+}
+
+// Listen starts listening for dump signals in the icinga:dump Redis stream. When a done signal is received, this is
+// signaled via the channels returned from the Done function.
+//
+// If a wip signal is received after a done signal was passed on via the Done function, this is signaled via the
+// InProgress function and this function returns with err == nil. In this case, all other signals are invalidated.
+// It is up to the caller to pass on this information, for example by cancelling derived contexts.
+//
+// This function may only be called once for each DumpSignals object. To listen for a new iteration of dump signals, a new
+// DumpSignals instance must be created.
+func (s *DumpSignals) Listen(ctx context.Context) error {
+ // Closing a channel twice results in a panic. This function takes a chan struct{} and closes it unless it is
+ // already closed. In this case it just does nothing. This function assumes that the channel is never written to
+ // and that there are no concurrent attempts to close the channel.
+ safeClose := func(ch chan struct{}) {
+ select {
+ case <-ch:
+ // Reading from a closed channel returns immediately, therefore don't close it again.
+ default:
+ close(ch)
+ }
+ }
+
+ lastStreamId := "0-0"
+ anyDoneSent := false
+
+ for {
+ if err := ctx.Err(); err != nil {
+ return errors.Wrap(err, "can't listen for dump signals")
+ }
+
+ streams, err := s.redis.XReadUntilResult(ctx, &redis.XReadArgs{
+ Streams: []string{"icinga:dump", lastStreamId},
+ })
+ if err != nil {
+ return errors.Wrap(err, "can't read dump signals")
+ }
+
+ for _, entry := range streams[0].Messages {
+ lastStreamId = entry.ID
+ key := entry.Values["key"].(string)
+ done := entry.Values["state"].(string) == "done"
+
+ s.logger.Debugw("Received dump signal from Redis", zap.String("key", key), zap.Bool("done", done))
+
+ if done {
+ if key == "*" {
+ if s.allDoneCh == nil {
+ s.mutex.Lock()
+
+ // Set s.allDoneCh to signal for all future listeners that we've received an all-done signal.
+ s.allDoneCh = make(chan struct{})
+ close(s.allDoneCh)
+
+ // Notify all existing listeners.
+ for _, ch := range s.doneCh {
+ safeClose(ch)
+ }
+
+ s.mutex.Unlock()
+ }
+ } else {
+ s.mutex.Lock()
+ if ch, ok := s.doneCh[key]; ok {
+ safeClose(ch)
+ }
+ s.mutex.Unlock()
+ }
+ anyDoneSent = true
+ } else if anyDoneSent {
+ // Received a wip signal after handing out any done signal via one of the channels returned by Done,
+ // signal that a new dump is in progress. This treats every state=wip as if it has key=*, which is the
+ // only key for which state=wip is currently sent by Icinga 2.
+ close(s.inProgressCh)
+ return nil
+ }
+ }
+ }
+}
+
+// Done returns a channel that is closed when the given key receives a done dump signal.
+func (s *DumpSignals) Done(key string) <-chan struct{} {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ if s.allDoneCh != nil {
+ // An all done-signal was already received, don't care about individual key anymore.
+ return s.allDoneCh
+ } else if ch, ok := s.doneCh[key]; ok {
+ // Return existing wait channel for this key.
+ return ch
+ } else {
+ // First request for this key, create new wait channel.
+ ch = make(chan struct{})
+ s.doneCh[key] = ch
+ return ch
+ }
+}
+
+// InProgress returns a channel that is closed when a new dump is in progress after done signals were sent to channels
+// returned by Done.
+func (s *DumpSignals) InProgress() <-chan struct{} {
+ return s.inProgressCh
+}