diff options
Diffstat (limited to 'pkg/icingadb/v1/customvar.go')
-rw-r--r-- | pkg/icingadb/v1/customvar.go | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/pkg/icingadb/v1/customvar.go b/pkg/icingadb/v1/customvar.go new file mode 100644 index 0000000..0e85cc0 --- /dev/null +++ b/pkg/icingadb/v1/customvar.go @@ -0,0 +1,160 @@ +package v1 + +import ( + "context" + "fmt" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/flatten" + "github.com/icinga/icingadb/pkg/icingadb/objectpacker" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "golang.org/x/sync/errgroup" + "runtime" +) + +type Customvar struct { + EntityWithoutChecksum `json:",inline"` + EnvironmentMeta `json:",inline"` + NameMeta `json:",inline"` + Value string `json:"value"` +} + +type CustomvarFlat struct { + CustomvarMeta `json:",inline"` + Flatname string `json:"flatname"` + FlatnameChecksum types.Binary `json:"flatname_checksum"` + Flatvalue string `json:"flatvalue"` +} + +func NewCustomvar() contracts.Entity { + return &Customvar{} +} + +func NewCustomvarFlat() contracts.Entity { + return &CustomvarFlat{} +} + +// ExpandCustomvars streams custom variables from a provided channel and returns three channels, +// the first providing the unmodified custom variable read from the input channel, +// the second channel providing the corresponding resolved flat custom variables, +// and the third channel providing an error, if any. +func ExpandCustomvars( + ctx context.Context, + cvs <-chan contracts.Entity, +) (customvars, flatCustomvars <-chan contracts.Entity, errs <-chan error) { + g, ctx := errgroup.WithContext(ctx) + + // Multiplex cvs to use them both for customvar and customvar_flat. + var forward chan contracts.Entity + customvars, forward = multiplexCvs(ctx, g, cvs) + flatCustomvars = flattenCustomvars(ctx, g, forward) + errs = com.WaitAsync(g) + + return +} + +// multiplexCvs streams custom variables from a provided channel and +// forwards each custom variable to the two returned output channels. +func multiplexCvs( + ctx context.Context, + g *errgroup.Group, + cvs <-chan contracts.Entity, +) (customvars1, customvars2 chan contracts.Entity) { + customvars1 = make(chan contracts.Entity) + customvars2 = make(chan contracts.Entity) + + g.Go(func() error { + defer close(customvars1) + defer close(customvars2) + + for { + select { + case cv, ok := <-cvs: + if !ok { + return nil + } + + select { + case customvars1 <- cv: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case customvars2 <- cv: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return +} + +// flattenCustomvars creates and yields flat custom variables from the provided custom variables. +func flattenCustomvars(ctx context.Context, g *errgroup.Group, cvs <-chan contracts.Entity) (flatCustomvars chan contracts.Entity) { + flatCustomvars = make(chan contracts.Entity) + + g.Go(func() error { + defer close(flatCustomvars) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < runtime.NumCPU(); i++ { + g.Go(func() error { + for entity := range cvs { + var value interface{} + customvar := entity.(*Customvar) + if err := internal.UnmarshalJSON([]byte(customvar.Value), &value); err != nil { + return err + } + + flattened := flatten.Flatten(value, customvar.Name) + + for flatname, flatvalue := range flattened { + var fv string + if flatvalue == nil { + fv = "null" + } else { + fv = fmt.Sprintf("%v", flatvalue) + } + + select { + case flatCustomvars <- &CustomvarFlat{ + CustomvarMeta: CustomvarMeta{ + EntityWithoutChecksum: EntityWithoutChecksum{ + IdMeta: IdMeta{ + // TODO(el): Schema comment is wrong. + // Without customvar.Id we would produce duplicate keys here. + Id: utils.Checksum(objectpacker.MustPackSlice(customvar.EnvironmentId, customvar.Id, flatname, flatvalue)), + }, + }, + EnvironmentMeta: EnvironmentMeta{ + EnvironmentId: customvar.EnvironmentId, + }, + CustomvarId: customvar.Id, + }, + Flatname: flatname, + FlatnameChecksum: utils.Checksum(flatname), + Flatvalue: fv, + }: + case <-ctx.Done(): + return ctx.Err() + } + } + } + + return nil + }) + } + + return g.Wait() + }) + + return +} |