diff options
Diffstat (limited to 'pkg/com')
-rw-r--r-- | pkg/com/atomic.go | 38 | ||||
-rw-r--r-- | pkg/com/bulker.go | 187 | ||||
-rw-r--r-- | pkg/com/com.go | 82 | ||||
-rw-r--r-- | pkg/com/cond.go | 90 | ||||
-rw-r--r-- | pkg/com/counter.go | 48 |
5 files changed, 445 insertions, 0 deletions
diff --git a/pkg/com/atomic.go b/pkg/com/atomic.go new file mode 100644 index 0000000..316413d --- /dev/null +++ b/pkg/com/atomic.go @@ -0,0 +1,38 @@ +package com + +import "sync/atomic" + +// Atomic is a type-safe wrapper around atomic.Value. +type Atomic[T any] struct { + v atomic.Value +} + +func (a *Atomic[T]) Load() (_ T, ok bool) { + if v, ok := a.v.Load().(box[T]); ok { + return v.v, true + } + + return +} + +func (a *Atomic[T]) Store(v T) { + a.v.Store(box[T]{v}) +} + +func (a *Atomic[T]) Swap(new T) (old T, ok bool) { + if old, ok := a.v.Swap(box[T]{new}).(box[T]); ok { + return old.v, true + } + + return +} + +func (a *Atomic[T]) CompareAndSwap(old, new T) (swapped bool) { + return a.v.CompareAndSwap(box[T]{old}, box[T]{new}) +} + +// box allows, for the case T is an interface, nil values and values of different specific types implementing T +// to be stored in Atomic[T]#v (bypassing atomic.Value#Store()'s policy) by wrapping it (into a non-interface). +type box[T any] struct { + v T +} diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go new file mode 100644 index 0000000..5e8de52 --- /dev/null +++ b/pkg/com/bulker.go @@ -0,0 +1,187 @@ +package com + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" + "golang.org/x/sync/errgroup" + "sync" + "time" +) + +// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles. +// A call takes an item for the current chunk into account. +// Output true indicates that the state machine was reset first and the bulker +// shall finish the current chunk now (not e.g. once $size is reached) without the given item. +type BulkChunkSplitPolicy[T any] func(T) bool + +type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T] + +// NeverSplit returns a pseudo state machine which never demands splitting. +func NeverSplit[T any]() BulkChunkSplitPolicy[T] { + return neverSplit[T] +} + +// SplitOnDupId returns a state machine which tracks the inputs' IDs. +// Once an already seen input arrives, it demands splitting. +func SplitOnDupId[T contracts.IDer]() BulkChunkSplitPolicy[T] { + seenIds := map[string]struct{}{} + + return func(ider T) bool { + id := ider.ID().String() + + _, ok := seenIds[id] + if ok { + seenIds = map[string]struct{}{id: {}} + } else { + seenIds[id] = struct{}{} + } + + return ok + } +} + +func neverSplit[T any](T) bool { + return false +} + +// Bulker reads all values from a channel and streams them in chunks into a Bulk channel. +type Bulker[T any] struct { + ch chan []T + ctx context.Context + mu sync.Mutex +} + +// NewBulker returns a new Bulker and starts streaming. +func NewBulker[T any]( + ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], +) *Bulker[T] { + b := &Bulker[T]{ + ch: make(chan []T), + ctx: ctx, + mu: sync.Mutex{}, + } + + go b.run(ch, count, splitPolicyFactory) + + return b +} + +// Bulk returns the channel on which the bulks are delivered. +func (b *Bulker[T]) Bulk() <-chan []T { + return b.ch +} + +func (b *Bulker[T]) run(ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T]) { + defer close(b.ch) + + bufCh := make(chan T, count) + splitPolicy := splitPolicyFactory() + g, ctx := errgroup.WithContext(b.ctx) + + g.Go(func() error { + defer close(bufCh) + + for { + select { + case v, ok := <-ch: + if !ok { + return nil + } + + bufCh <- v + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + g.Go(func() error { + for done := false; !done; { + buf := make([]T, 0, count) + timeout := time.After(256 * time.Millisecond) + + for drain := true; drain && len(buf) < count; { + select { + case v, ok := <-bufCh: + if !ok { + drain = false + done = true + + break + } + + if splitPolicy(v) { + if len(buf) > 0 { + b.ch <- buf + buf = make([]T, 0, count) + } + + timeout = time.After(256 * time.Millisecond) + } + + buf = append(buf, v) + case <-timeout: + drain = false + case <-ctx.Done(): + return ctx.Err() + } + } + + if len(buf) > 0 { + b.ch <- buf + } + + splitPolicy = splitPolicyFactory() + } + + return nil + }) + + // We don't expect an error here. + // We only use errgroup for the encapsulated use of sync.WaitGroup. + _ = g.Wait() +} + +// Bulk reads all values from a channel and streams them in chunks into a returned channel. +func Bulk[T any]( + ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], +) <-chan []T { + if count <= 1 { + return oneBulk(ctx, ch) + } + + return NewBulker(ctx, ch, count, splitPolicyFactory).Bulk() +} + +// oneBulk operates just as NewBulker(ctx, ch, 1, splitPolicy).Bulk(), +// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy. +func oneBulk[T any](ctx context.Context, ch <-chan T) <-chan []T { + out := make(chan []T) + go func() { + defer close(out) + + for { + select { + case item, ok := <-ch: + if !ok { + return + } + + select { + case out <- []T{item}: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + + return out +} + +var ( + _ BulkChunkSplitPolicyFactory[struct{}] = NeverSplit[struct{}] + _ BulkChunkSplitPolicyFactory[contracts.Entity] = SplitOnDupId[contracts.Entity] +) diff --git a/pkg/com/com.go b/pkg/com/com.go new file mode 100644 index 0000000..9e0a698 --- /dev/null +++ b/pkg/com/com.go @@ -0,0 +1,82 @@ +package com + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" + "golang.org/x/sync/errgroup" +) + +// WaitAsync calls Wait() on the passed Waiter in a new goroutine and +// sends the first non-nil error (if any) to the returned channel. +// The returned channel is always closed when the Waiter is done. +func WaitAsync(w contracts.Waiter) <-chan error { + errs := make(chan error, 1) + + go func() { + defer close(errs) + + if e := w.Wait(); e != nil { + errs <- e + } + }() + + return errs +} + +// ErrgroupReceive adds a goroutine to the specified group that +// returns the first non-nil error (if any) from the specified channel. +// If the channel is closed, it will return nil. +func ErrgroupReceive(g *errgroup.Group, err <-chan error) { + g.Go(func() error { + if e := <-err; e != nil { + return e + } + + return nil + }) +} + +// CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item. +func CopyFirst( + ctx context.Context, input <-chan contracts.Entity, +) (first contracts.Entity, forward <-chan contracts.Entity, err error) { + var ok bool + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case first, ok = <-input: + } + + if !ok { + return + } + + // Buffer of one because we receive an entity and send it back immediately. + fwd := make(chan contracts.Entity, 1) + fwd <- first + + forward = fwd + + go func() { + defer close(fwd) + + for { + select { + case <-ctx.Done(): + return + case e, ok := <-input: + if !ok { + return + } + + select { + case <-ctx.Done(): + return + case fwd <- e: + } + } + } + }() + + return +} diff --git a/pkg/com/cond.go b/pkg/com/cond.go new file mode 100644 index 0000000..72ba347 --- /dev/null +++ b/pkg/com/cond.go @@ -0,0 +1,90 @@ +package com + +import ( + "context" + "github.com/pkg/errors" +) + +// Cond implements a channel-based synchronization for goroutines that wait for signals or send them. +// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation, +// which is only started when NewCond is called. Thus the zero value cannot be used. +type Cond struct { + broadcast chan struct{} + done chan struct{} + cancel context.CancelFunc + listeners chan chan struct{} +} + +// NewCond returns a new Cond and starts the controller loop. +func NewCond(ctx context.Context) *Cond { + ctx, cancel := context.WithCancel(ctx) + + c := &Cond{ + broadcast: make(chan struct{}), + cancel: cancel, + done: make(chan struct{}), + listeners: make(chan chan struct{}), + } + + go c.controller(ctx) + + return c +} + +// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait. +// Panics if the controller loop has already ended. +func (c *Cond) Broadcast() { + select { + case c.broadcast <- struct{}{}: + case <-c.done: + panic(errors.New("condition closed")) + } +} + +// Close stops the controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (c *Cond) Close() error { + c.cancel() + <-c.done + + return nil +} + +// Done returns a channel that will be closed when the controller loop has ended. +func (c *Cond) Done() <-chan struct{} { + return c.done +} + +// Wait returns a channel that is closed with the next signal. +// Panics if the controller loop has already ended. +func (c *Cond) Wait() <-chan struct{} { + select { + case l := <-c.listeners: + return l + case <-c.done: + panic(errors.New("condition closed")) + } +} + +// controller loop. +func (c *Cond) controller(ctx context.Context) { + defer close(c.done) + + // Note that the notify channel does not close when the controller loop ends + // in order not to notify pending listeners. + notify := make(chan struct{}) + + for { + select { + case <-c.broadcast: + // Close channel to notify all current listeners. + close(notify) + // Create a new channel for the next listeners. + notify = make(chan struct{}) + case c.listeners <- notify: + // A new listener received the channel. + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/com/counter.go b/pkg/com/counter.go new file mode 100644 index 0000000..52f9f7f --- /dev/null +++ b/pkg/com/counter.go @@ -0,0 +1,48 @@ +package com + +import ( + "sync" + "sync/atomic" +) + +// Counter implements an atomic counter. +type Counter struct { + value uint64 + mu sync.Mutex // Protects total. + total uint64 +} + +// Add adds the given delta to the counter. +func (c *Counter) Add(delta uint64) { + atomic.AddUint64(&c.value, delta) +} + +// Inc increments the counter by one. +func (c *Counter) Inc() { + c.Add(1) +} + +// Reset resets the counter to 0 and returns its previous value. +// Does not reset the total value returned from Total. +func (c *Counter) Reset() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + v := atomic.SwapUint64(&c.value, 0) + c.total += v + + return v +} + +// Total returns the total counter value. +func (c *Counter) Total() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + return c.total + c.Val() +} + +// Val returns the current counter value. +func (c *Counter) Val() uint64 { + return atomic.LoadUint64(&c.value) +} |