diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:36:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:36:04 +0000 |
commit | b09c6d56832eb1718c07d74abf3bc6ae3fe4e030 (patch) | |
tree | d2caec2610d4ea887803ec9e9c3cd77136c448ba /pkg/com/cond.go | |
parent | Initial commit. (diff) | |
download | icingadb-upstream.tar.xz icingadb-upstream.zip |
Adding upstream version 1.1.0.upstream/1.1.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'pkg/com/cond.go')
-rw-r--r-- | pkg/com/cond.go | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/pkg/com/cond.go b/pkg/com/cond.go new file mode 100644 index 0000000..72ba347 --- /dev/null +++ b/pkg/com/cond.go @@ -0,0 +1,90 @@ +package com + +import ( + "context" + "github.com/pkg/errors" +) + +// Cond implements a channel-based synchronization for goroutines that wait for signals or send them. +// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation, +// which is only started when NewCond is called. Thus the zero value cannot be used. +type Cond struct { + broadcast chan struct{} + done chan struct{} + cancel context.CancelFunc + listeners chan chan struct{} +} + +// NewCond returns a new Cond and starts the controller loop. +func NewCond(ctx context.Context) *Cond { + ctx, cancel := context.WithCancel(ctx) + + c := &Cond{ + broadcast: make(chan struct{}), + cancel: cancel, + done: make(chan struct{}), + listeners: make(chan chan struct{}), + } + + go c.controller(ctx) + + return c +} + +// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait. +// Panics if the controller loop has already ended. +func (c *Cond) Broadcast() { + select { + case c.broadcast <- struct{}{}: + case <-c.done: + panic(errors.New("condition closed")) + } +} + +// Close stops the controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (c *Cond) Close() error { + c.cancel() + <-c.done + + return nil +} + +// Done returns a channel that will be closed when the controller loop has ended. +func (c *Cond) Done() <-chan struct{} { + return c.done +} + +// Wait returns a channel that is closed with the next signal. +// Panics if the controller loop has already ended. +func (c *Cond) Wait() <-chan struct{} { + select { + case l := <-c.listeners: + return l + case <-c.done: + panic(errors.New("condition closed")) + } +} + +// controller loop. +func (c *Cond) controller(ctx context.Context) { + defer close(c.done) + + // Note that the notify channel does not close when the controller loop ends + // in order not to notify pending listeners. + notify := make(chan struct{}) + + for { + select { + case <-c.broadcast: + // Close channel to notify all current listeners. + close(notify) + // Create a new channel for the next listeners. + notify = make(chan struct{}) + case c.listeners <- notify: + // A new listener received the channel. + case <-ctx.Done(): + return + } + } +} |