diff options
Diffstat (limited to '')
-rw-r--r-- | pkg/com/bulker.go | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go new file mode 100644 index 0000000..5e8de52 --- /dev/null +++ b/pkg/com/bulker.go @@ -0,0 +1,187 @@ +package com + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" + "golang.org/x/sync/errgroup" + "sync" + "time" +) + +// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles. +// A call takes an item for the current chunk into account. +// Output true indicates that the state machine was reset first and the bulker +// shall finish the current chunk now (not e.g. once $size is reached) without the given item. +type BulkChunkSplitPolicy[T any] func(T) bool + +type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T] + +// NeverSplit returns a pseudo state machine which never demands splitting. +func NeverSplit[T any]() BulkChunkSplitPolicy[T] { + return neverSplit[T] +} + +// SplitOnDupId returns a state machine which tracks the inputs' IDs. +// Once an already seen input arrives, it demands splitting. +func SplitOnDupId[T contracts.IDer]() BulkChunkSplitPolicy[T] { + seenIds := map[string]struct{}{} + + return func(ider T) bool { + id := ider.ID().String() + + _, ok := seenIds[id] + if ok { + seenIds = map[string]struct{}{id: {}} + } else { + seenIds[id] = struct{}{} + } + + return ok + } +} + +func neverSplit[T any](T) bool { + return false +} + +// Bulker reads all values from a channel and streams them in chunks into a Bulk channel. +type Bulker[T any] struct { + ch chan []T + ctx context.Context + mu sync.Mutex +} + +// NewBulker returns a new Bulker and starts streaming. +func NewBulker[T any]( + ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], +) *Bulker[T] { + b := &Bulker[T]{ + ch: make(chan []T), + ctx: ctx, + mu: sync.Mutex{}, + } + + go b.run(ch, count, splitPolicyFactory) + + return b +} + +// Bulk returns the channel on which the bulks are delivered. +func (b *Bulker[T]) Bulk() <-chan []T { + return b.ch +} + +func (b *Bulker[T]) run(ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T]) { + defer close(b.ch) + + bufCh := make(chan T, count) + splitPolicy := splitPolicyFactory() + g, ctx := errgroup.WithContext(b.ctx) + + g.Go(func() error { + defer close(bufCh) + + for { + select { + case v, ok := <-ch: + if !ok { + return nil + } + + bufCh <- v + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + g.Go(func() error { + for done := false; !done; { + buf := make([]T, 0, count) + timeout := time.After(256 * time.Millisecond) + + for drain := true; drain && len(buf) < count; { + select { + case v, ok := <-bufCh: + if !ok { + drain = false + done = true + + break + } + + if splitPolicy(v) { + if len(buf) > 0 { + b.ch <- buf + buf = make([]T, 0, count) + } + + timeout = time.After(256 * time.Millisecond) + } + + buf = append(buf, v) + case <-timeout: + drain = false + case <-ctx.Done(): + return ctx.Err() + } + } + + if len(buf) > 0 { + b.ch <- buf + } + + splitPolicy = splitPolicyFactory() + } + + return nil + }) + + // We don't expect an error here. + // We only use errgroup for the encapsulated use of sync.WaitGroup. + _ = g.Wait() +} + +// Bulk reads all values from a channel and streams them in chunks into a returned channel. +func Bulk[T any]( + ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], +) <-chan []T { + if count <= 1 { + return oneBulk(ctx, ch) + } + + return NewBulker(ctx, ch, count, splitPolicyFactory).Bulk() +} + +// oneBulk operates just as NewBulker(ctx, ch, 1, splitPolicy).Bulk(), +// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy. +func oneBulk[T any](ctx context.Context, ch <-chan T) <-chan []T { + out := make(chan []T) + go func() { + defer close(out) + + for { + select { + case item, ok := <-ch: + if !ok { + return + } + + select { + case out <- []T{item}: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + + return out +} + +var ( + _ BulkChunkSplitPolicyFactory[struct{}] = NeverSplit[struct{}] + _ BulkChunkSplitPolicyFactory[contracts.Entity] = SplitOnDupId[contracts.Entity] +) |