summaryrefslogtreecommitdiffstats
path: root/pkg/com
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:36:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:36:04 +0000
commitb09c6d56832eb1718c07d74abf3bc6ae3fe4e030 (patch)
treed2caec2610d4ea887803ec9e9c3cd77136c448ba /pkg/com
parentInitial commit. (diff)
downloadicingadb-upstream.tar.xz
icingadb-upstream.zip
Adding upstream version 1.1.0.upstream/1.1.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'pkg/com')
-rw-r--r--pkg/com/atomic.go38
-rw-r--r--pkg/com/bulker.go187
-rw-r--r--pkg/com/com.go82
-rw-r--r--pkg/com/cond.go90
-rw-r--r--pkg/com/counter.go48
5 files changed, 445 insertions, 0 deletions
diff --git a/pkg/com/atomic.go b/pkg/com/atomic.go
new file mode 100644
index 0000000..316413d
--- /dev/null
+++ b/pkg/com/atomic.go
@@ -0,0 +1,38 @@
+package com
+
+import "sync/atomic"
+
+// Atomic is a type-safe wrapper around atomic.Value.
+type Atomic[T any] struct {
+ v atomic.Value
+}
+
+func (a *Atomic[T]) Load() (_ T, ok bool) {
+ if v, ok := a.v.Load().(box[T]); ok {
+ return v.v, true
+ }
+
+ return
+}
+
+func (a *Atomic[T]) Store(v T) {
+ a.v.Store(box[T]{v})
+}
+
+func (a *Atomic[T]) Swap(new T) (old T, ok bool) {
+ if old, ok := a.v.Swap(box[T]{new}).(box[T]); ok {
+ return old.v, true
+ }
+
+ return
+}
+
+func (a *Atomic[T]) CompareAndSwap(old, new T) (swapped bool) {
+ return a.v.CompareAndSwap(box[T]{old}, box[T]{new})
+}
+
+// box allows, for the case T is an interface, nil values and values of different specific types implementing T
+// to be stored in Atomic[T]#v (bypassing atomic.Value#Store()'s policy) by wrapping it (into a non-interface).
+type box[T any] struct {
+ v T
+}
diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go
new file mode 100644
index 0000000..5e8de52
--- /dev/null
+++ b/pkg/com/bulker.go
@@ -0,0 +1,187 @@
+package com
+
+import (
+ "context"
+ "github.com/icinga/icingadb/pkg/contracts"
+ "golang.org/x/sync/errgroup"
+ "sync"
+ "time"
+)
+
+// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles.
+// A call takes an item for the current chunk into account.
+// Output true indicates that the state machine was reset first and the bulker
+// shall finish the current chunk now (not e.g. once $size is reached) without the given item.
+type BulkChunkSplitPolicy[T any] func(T) bool
+
+type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T]
+
+// NeverSplit returns a pseudo state machine which never demands splitting.
+func NeverSplit[T any]() BulkChunkSplitPolicy[T] {
+ return neverSplit[T]
+}
+
+// SplitOnDupId returns a state machine which tracks the inputs' IDs.
+// Once an already seen input arrives, it demands splitting.
+func SplitOnDupId[T contracts.IDer]() BulkChunkSplitPolicy[T] {
+ seenIds := map[string]struct{}{}
+
+ return func(ider T) bool {
+ id := ider.ID().String()
+
+ _, ok := seenIds[id]
+ if ok {
+ seenIds = map[string]struct{}{id: {}}
+ } else {
+ seenIds[id] = struct{}{}
+ }
+
+ return ok
+ }
+}
+
+func neverSplit[T any](T) bool {
+ return false
+}
+
+// Bulker reads all values from a channel and streams them in chunks into a Bulk channel.
+type Bulker[T any] struct {
+ ch chan []T
+ ctx context.Context
+ mu sync.Mutex
+}
+
+// NewBulker returns a new Bulker and starts streaming.
+func NewBulker[T any](
+ ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T],
+) *Bulker[T] {
+ b := &Bulker[T]{
+ ch: make(chan []T),
+ ctx: ctx,
+ mu: sync.Mutex{},
+ }
+
+ go b.run(ch, count, splitPolicyFactory)
+
+ return b
+}
+
+// Bulk returns the channel on which the bulks are delivered.
+func (b *Bulker[T]) Bulk() <-chan []T {
+ return b.ch
+}
+
+func (b *Bulker[T]) run(ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T]) {
+ defer close(b.ch)
+
+ bufCh := make(chan T, count)
+ splitPolicy := splitPolicyFactory()
+ g, ctx := errgroup.WithContext(b.ctx)
+
+ g.Go(func() error {
+ defer close(bufCh)
+
+ for {
+ select {
+ case v, ok := <-ch:
+ if !ok {
+ return nil
+ }
+
+ bufCh <- v
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ })
+
+ g.Go(func() error {
+ for done := false; !done; {
+ buf := make([]T, 0, count)
+ timeout := time.After(256 * time.Millisecond)
+
+ for drain := true; drain && len(buf) < count; {
+ select {
+ case v, ok := <-bufCh:
+ if !ok {
+ drain = false
+ done = true
+
+ break
+ }
+
+ if splitPolicy(v) {
+ if len(buf) > 0 {
+ b.ch <- buf
+ buf = make([]T, 0, count)
+ }
+
+ timeout = time.After(256 * time.Millisecond)
+ }
+
+ buf = append(buf, v)
+ case <-timeout:
+ drain = false
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+ if len(buf) > 0 {
+ b.ch <- buf
+ }
+
+ splitPolicy = splitPolicyFactory()
+ }
+
+ return nil
+ })
+
+ // We don't expect an error here.
+ // We only use errgroup for the encapsulated use of sync.WaitGroup.
+ _ = g.Wait()
+}
+
+// Bulk reads all values from a channel and streams them in chunks into a returned channel.
+func Bulk[T any](
+ ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T],
+) <-chan []T {
+ if count <= 1 {
+ return oneBulk(ctx, ch)
+ }
+
+ return NewBulker(ctx, ch, count, splitPolicyFactory).Bulk()
+}
+
+// oneBulk operates just as NewBulker(ctx, ch, 1, splitPolicy).Bulk(),
+// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy.
+func oneBulk[T any](ctx context.Context, ch <-chan T) <-chan []T {
+ out := make(chan []T)
+ go func() {
+ defer close(out)
+
+ for {
+ select {
+ case item, ok := <-ch:
+ if !ok {
+ return
+ }
+
+ select {
+ case out <- []T{item}:
+ case <-ctx.Done():
+ return
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return out
+}
+
+var (
+ _ BulkChunkSplitPolicyFactory[struct{}] = NeverSplit[struct{}]
+ _ BulkChunkSplitPolicyFactory[contracts.Entity] = SplitOnDupId[contracts.Entity]
+)
diff --git a/pkg/com/com.go b/pkg/com/com.go
new file mode 100644
index 0000000..9e0a698
--- /dev/null
+++ b/pkg/com/com.go
@@ -0,0 +1,82 @@
+package com
+
+import (
+ "context"
+ "github.com/icinga/icingadb/pkg/contracts"
+ "golang.org/x/sync/errgroup"
+)
+
+// WaitAsync calls Wait() on the passed Waiter in a new goroutine and
+// sends the first non-nil error (if any) to the returned channel.
+// The returned channel is always closed when the Waiter is done.
+func WaitAsync(w contracts.Waiter) <-chan error {
+ errs := make(chan error, 1)
+
+ go func() {
+ defer close(errs)
+
+ if e := w.Wait(); e != nil {
+ errs <- e
+ }
+ }()
+
+ return errs
+}
+
+// ErrgroupReceive adds a goroutine to the specified group that
+// returns the first non-nil error (if any) from the specified channel.
+// If the channel is closed, it will return nil.
+func ErrgroupReceive(g *errgroup.Group, err <-chan error) {
+ g.Go(func() error {
+ if e := <-err; e != nil {
+ return e
+ }
+
+ return nil
+ })
+}
+
+// CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item.
+func CopyFirst(
+ ctx context.Context, input <-chan contracts.Entity,
+) (first contracts.Entity, forward <-chan contracts.Entity, err error) {
+ var ok bool
+ select {
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ case first, ok = <-input:
+ }
+
+ if !ok {
+ return
+ }
+
+ // Buffer of one because we receive an entity and send it back immediately.
+ fwd := make(chan contracts.Entity, 1)
+ fwd <- first
+
+ forward = fwd
+
+ go func() {
+ defer close(fwd)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case e, ok := <-input:
+ if !ok {
+ return
+ }
+
+ select {
+ case <-ctx.Done():
+ return
+ case fwd <- e:
+ }
+ }
+ }
+ }()
+
+ return
+}
diff --git a/pkg/com/cond.go b/pkg/com/cond.go
new file mode 100644
index 0000000..72ba347
--- /dev/null
+++ b/pkg/com/cond.go
@@ -0,0 +1,90 @@
+package com
+
+import (
+ "context"
+ "github.com/pkg/errors"
+)
+
+// Cond implements a channel-based synchronization for goroutines that wait for signals or send them.
+// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation,
+// which is only started when NewCond is called. Thus the zero value cannot be used.
+type Cond struct {
+ broadcast chan struct{}
+ done chan struct{}
+ cancel context.CancelFunc
+ listeners chan chan struct{}
+}
+
+// NewCond returns a new Cond and starts the controller loop.
+func NewCond(ctx context.Context) *Cond {
+ ctx, cancel := context.WithCancel(ctx)
+
+ c := &Cond{
+ broadcast: make(chan struct{}),
+ cancel: cancel,
+ done: make(chan struct{}),
+ listeners: make(chan chan struct{}),
+ }
+
+ go c.controller(ctx)
+
+ return c
+}
+
+// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait.
+// Panics if the controller loop has already ended.
+func (c *Cond) Broadcast() {
+ select {
+ case c.broadcast <- struct{}{}:
+ case <-c.done:
+ panic(errors.New("condition closed"))
+ }
+}
+
+// Close stops the controller loop, waits for it to finish, and returns an error if any.
+// Implements the io.Closer interface.
+func (c *Cond) Close() error {
+ c.cancel()
+ <-c.done
+
+ return nil
+}
+
+// Done returns a channel that will be closed when the controller loop has ended.
+func (c *Cond) Done() <-chan struct{} {
+ return c.done
+}
+
+// Wait returns a channel that is closed with the next signal.
+// Panics if the controller loop has already ended.
+func (c *Cond) Wait() <-chan struct{} {
+ select {
+ case l := <-c.listeners:
+ return l
+ case <-c.done:
+ panic(errors.New("condition closed"))
+ }
+}
+
+// controller loop.
+func (c *Cond) controller(ctx context.Context) {
+ defer close(c.done)
+
+ // Note that the notify channel does not close when the controller loop ends
+ // in order not to notify pending listeners.
+ notify := make(chan struct{})
+
+ for {
+ select {
+ case <-c.broadcast:
+ // Close channel to notify all current listeners.
+ close(notify)
+ // Create a new channel for the next listeners.
+ notify = make(chan struct{})
+ case c.listeners <- notify:
+ // A new listener received the channel.
+ case <-ctx.Done():
+ return
+ }
+ }
+}
diff --git a/pkg/com/counter.go b/pkg/com/counter.go
new file mode 100644
index 0000000..52f9f7f
--- /dev/null
+++ b/pkg/com/counter.go
@@ -0,0 +1,48 @@
+package com
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+// Counter implements an atomic counter.
+type Counter struct {
+ value uint64
+ mu sync.Mutex // Protects total.
+ total uint64
+}
+
+// Add adds the given delta to the counter.
+func (c *Counter) Add(delta uint64) {
+ atomic.AddUint64(&c.value, delta)
+}
+
+// Inc increments the counter by one.
+func (c *Counter) Inc() {
+ c.Add(1)
+}
+
+// Reset resets the counter to 0 and returns its previous value.
+// Does not reset the total value returned from Total.
+func (c *Counter) Reset() uint64 {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ v := atomic.SwapUint64(&c.value, 0)
+ c.total += v
+
+ return v
+}
+
+// Total returns the total counter value.
+func (c *Counter) Total() uint64 {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ return c.total + c.Val()
+}
+
+// Val returns the current counter value.
+func (c *Counter) Val() uint64 {
+ return atomic.LoadUint64(&c.value)
+}