diff options
Diffstat (limited to 'pkg/com/com.go')
-rw-r--r-- | pkg/com/com.go | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/pkg/com/com.go b/pkg/com/com.go new file mode 100644 index 0000000..9e0a698 --- /dev/null +++ b/pkg/com/com.go @@ -0,0 +1,82 @@ +package com + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" + "golang.org/x/sync/errgroup" +) + +// WaitAsync calls Wait() on the passed Waiter in a new goroutine and +// sends the first non-nil error (if any) to the returned channel. +// The returned channel is always closed when the Waiter is done. +func WaitAsync(w contracts.Waiter) <-chan error { + errs := make(chan error, 1) + + go func() { + defer close(errs) + + if e := w.Wait(); e != nil { + errs <- e + } + }() + + return errs +} + +// ErrgroupReceive adds a goroutine to the specified group that +// returns the first non-nil error (if any) from the specified channel. +// If the channel is closed, it will return nil. +func ErrgroupReceive(g *errgroup.Group, err <-chan error) { + g.Go(func() error { + if e := <-err; e != nil { + return e + } + + return nil + }) +} + +// CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item. +func CopyFirst( + ctx context.Context, input <-chan contracts.Entity, +) (first contracts.Entity, forward <-chan contracts.Entity, err error) { + var ok bool + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case first, ok = <-input: + } + + if !ok { + return + } + + // Buffer of one because we receive an entity and send it back immediately. + fwd := make(chan contracts.Entity, 1) + fwd <- first + + forward = fwd + + go func() { + defer close(fwd) + + for { + select { + case <-ctx.Done(): + return + case e, ok := <-input: + if !ok { + return + } + + select { + case <-ctx.Done(): + return + case fwd <- e: + } + } + } + }() + + return +} |