summaryrefslogtreecommitdiffstats
path: root/pkg/com/com.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/com/com.go')
-rw-r--r--pkg/com/com.go82
1 files changed, 82 insertions, 0 deletions
diff --git a/pkg/com/com.go b/pkg/com/com.go
new file mode 100644
index 0000000..9e0a698
--- /dev/null
+++ b/pkg/com/com.go
@@ -0,0 +1,82 @@
+package com
+
+import (
+ "context"
+ "github.com/icinga/icingadb/pkg/contracts"
+ "golang.org/x/sync/errgroup"
+)
+
+// WaitAsync calls Wait() on the passed Waiter in a new goroutine and
+// sends the first non-nil error (if any) to the returned channel.
+// The returned channel is always closed when the Waiter is done.
+func WaitAsync(w contracts.Waiter) <-chan error {
+ errs := make(chan error, 1)
+
+ go func() {
+ defer close(errs)
+
+ if e := w.Wait(); e != nil {
+ errs <- e
+ }
+ }()
+
+ return errs
+}
+
+// ErrgroupReceive adds a goroutine to the specified group that
+// returns the first non-nil error (if any) from the specified channel.
+// If the channel is closed, it will return nil.
+func ErrgroupReceive(g *errgroup.Group, err <-chan error) {
+ g.Go(func() error {
+ if e := <-err; e != nil {
+ return e
+ }
+
+ return nil
+ })
+}
+
+// CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item.
+func CopyFirst(
+ ctx context.Context, input <-chan contracts.Entity,
+) (first contracts.Entity, forward <-chan contracts.Entity, err error) {
+ var ok bool
+ select {
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ case first, ok = <-input:
+ }
+
+ if !ok {
+ return
+ }
+
+ // Buffer of one because we receive an entity and send it back immediately.
+ fwd := make(chan contracts.Entity, 1)
+ fwd <- first
+
+ forward = fwd
+
+ go func() {
+ defer close(fwd)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case e, ok := <-input:
+ if !ok {
+ return
+ }
+
+ select {
+ case <-ctx.Done():
+ return
+ case fwd <- e:
+ }
+ }
+ }
+ }()
+
+ return
+}