From b09c6d56832eb1718c07d74abf3bc6ae3fe4e030 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 14:36:04 +0200 Subject: Adding upstream version 1.1.0. Signed-off-by: Daniel Baumann --- pkg/icingaredis/utils.go | 128 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 pkg/icingaredis/utils.go (limited to 'pkg/icingaredis/utils.go') diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go new file mode 100644 index 0000000..9176dba --- /dev/null +++ b/pkg/icingaredis/utils.go @@ -0,0 +1,128 @@ +package icingaredis + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +// Streams represents a Redis stream key to ID mapping. +type Streams map[string]string + +// Option returns the Redis stream key to ID mapping +// as a slice of stream keys followed by their IDs +// that is compatible for the Redis STREAMS option. +func (s Streams) Option() []string { + // len*2 because we're appending the IDs later. + streams := make([]string, 0, len(s)*2) + ids := make([]string, 0, len(s)) + + for key, id := range s { + streams = append(streams, key) + ids = append(ids, id) + } + + return append(streams, ids...) +} + +// CreateEntities streams and creates entities from the +// given Redis field value pairs using the specified factory function, +// and streams them on a returned channel. +func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) { + entities := make(chan contracts.Entity) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entities) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < concurrent; i++ { + g.Go(func() error { + for pair := range pairs { + var id types.Binary + + if err := id.UnmarshalText([]byte(pair.Field)); err != nil { + return errors.Wrapf(err, "can't create ID from value %#v", pair.Field) + } + + e := factoryFunc() + if err := internal.UnmarshalJSON([]byte(pair.Value), e); err != nil { + return err + } + e.SetID(id) + + select { + case entities <- e: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + }) + + return entities, com.WaitAsync(g) +} + +// SetChecksums concurrently streams from the given entities and +// sets their checksums using the specified map and +// streams the results on a returned channel. +func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { + entitiesWithChecksum := make(chan contracts.Entity) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entitiesWithChecksum) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < concurrent; i++ { + g.Go(func() error { + for entity := range entities { + if checksumer, ok := checksums[entity.ID().String()]; ok { + entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum()) + } else { + return errors.Errorf("no checksum for %#v", entity) + } + + select { + case entitiesWithChecksum <- entity: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + }) + + return entitiesWithChecksum, com.WaitAsync(g) +} + +// WrapCmdErr adds the command itself and +// the stack of the current goroutine to the command's error if any. +func WrapCmdErr(cmd redis.Cmder) error { + err := cmd.Err() + if err != nil { + err = errors.Wrapf(err, "can't perform %q", utils.Ellipsize( + redis.NewCmd(context.Background(), cmd.Args()).String(), // Omits error in opposite to cmd.String() + 100, + )) + } + + return err +} -- cgit v1.2.3