summaryrefslogtreecommitdiffstats
path: root/pkg/com/cond.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/com/cond.go')
-rw-r--r--pkg/com/cond.go90
1 files changed, 90 insertions, 0 deletions
diff --git a/pkg/com/cond.go b/pkg/com/cond.go
new file mode 100644
index 0000000..72ba347
--- /dev/null
+++ b/pkg/com/cond.go
@@ -0,0 +1,90 @@
+package com
+
+import (
+ "context"
+ "github.com/pkg/errors"
+)
+
+// Cond implements a channel-based synchronization for goroutines that wait for signals or send them.
+// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation,
+// which is only started when NewCond is called. Thus the zero value cannot be used.
+type Cond struct {
+ broadcast chan struct{}
+ done chan struct{}
+ cancel context.CancelFunc
+ listeners chan chan struct{}
+}
+
+// NewCond returns a new Cond and starts the controller loop.
+func NewCond(ctx context.Context) *Cond {
+ ctx, cancel := context.WithCancel(ctx)
+
+ c := &Cond{
+ broadcast: make(chan struct{}),
+ cancel: cancel,
+ done: make(chan struct{}),
+ listeners: make(chan chan struct{}),
+ }
+
+ go c.controller(ctx)
+
+ return c
+}
+
+// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait.
+// Panics if the controller loop has already ended.
+func (c *Cond) Broadcast() {
+ select {
+ case c.broadcast <- struct{}{}:
+ case <-c.done:
+ panic(errors.New("condition closed"))
+ }
+}
+
+// Close stops the controller loop, waits for it to finish, and returns an error if any.
+// Implements the io.Closer interface.
+func (c *Cond) Close() error {
+ c.cancel()
+ <-c.done
+
+ return nil
+}
+
+// Done returns a channel that will be closed when the controller loop has ended.
+func (c *Cond) Done() <-chan struct{} {
+ return c.done
+}
+
+// Wait returns a channel that is closed with the next signal.
+// Panics if the controller loop has already ended.
+func (c *Cond) Wait() <-chan struct{} {
+ select {
+ case l := <-c.listeners:
+ return l
+ case <-c.done:
+ panic(errors.New("condition closed"))
+ }
+}
+
+// controller loop.
+func (c *Cond) controller(ctx context.Context) {
+ defer close(c.done)
+
+ // Note that the notify channel does not close when the controller loop ends
+ // in order not to notify pending listeners.
+ notify := make(chan struct{})
+
+ for {
+ select {
+ case <-c.broadcast:
+ // Close channel to notify all current listeners.
+ close(notify)
+ // Create a new channel for the next listeners.
+ notify = make(chan struct{})
+ case c.listeners <- notify:
+ // A new listener received the channel.
+ case <-ctx.Done():
+ return
+ }
+ }
+}