diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/go/parquet/schema/schema.go | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/go/parquet/schema/schema.go')
-rw-r--r-- | src/arrow/go/parquet/schema/schema.go | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/src/arrow/go/parquet/schema/schema.go b/src/arrow/go/parquet/schema/schema.go new file mode 100644 index 000000000..773082fe7 --- /dev/null +++ b/src/arrow/go/parquet/schema/schema.go @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package schema provides types and functions for manipulating and building parquet +// file schemas. +// +// Some of the utilities provided include building a schema using Struct Tags +// on a struct type, getting Column Paths from a node, and dealing with the +// converted and logical types for Parquet. +// +// Logical types specify ways to interpret the primitive types allowing the +// number of primitive types to be smaller and reuse efficient encodings. +// For instance a "string" is just a ByteArray column with a UTF-8 annotation +// or "String Logical Type". +// +// For more information about Logical and Converted Types, check: +// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md +package schema + +import ( + "fmt" + "io" + "strings" + + "github.com/apache/arrow/go/v6/parquet" + format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" + "golang.org/x/xerrors" +) + +// Schema is the container for the converted Parquet schema with a computed +// information from the schema analysis needed for file reading +// +// * Column index to Node +// +// * Max repetition / definition levels for each primitive node +// +// The ColumnDescriptor objects produced by this class can be used to assist in +// the reconstruction of fully materialized data structures from the +// repetition-definition level encoding of nested data +type Schema struct { + root Node + + leaves []*Column + nodeToLeaf map[*PrimitiveNode]int + leafToBase map[int]Node + leafToIndex strIntMultimap +} + +// FromParquet converts a slice of thrift Schema Elements to the correct node type +func FromParquet(elems []*format.SchemaElement) (Node, error) { + if len(elems) == 0 { + return nil, xerrors.New("parquet: empty schema (no root)") + } + + if elems[0].GetNumChildren() == 0 { + if len(elems) > 1 { + return nil, xerrors.New("parquet: schema had multiple nodes but root had no children") + } + // parquet file with no columns + return GroupNodeFromThrift(elems[0], []Node{}) + } + + // We don't check that the root node is repeated since this is not + // consistently set by implementations + var ( + pos = 0 + nextNode func() (Node, error) + ) + + nextNode = func() (Node, error) { + if pos == len(elems) { + return nil, xerrors.New("parquet: malformed schema: not enough elements") + } + + elem := elems[pos] + pos++ + + if elem.GetNumChildren() == 0 { + return PrimitiveNodeFromThrift(elem) + } + + fields := make([]Node, 0, elem.GetNumChildren()) + for i := 0; i < int(elem.GetNumChildren()); i++ { + n, err := nextNode() + if err != nil { + return nil, err + } + fields = append(fields, n) + } + + return GroupNodeFromThrift(elem, fields) + } + + return nextNode() +} + +// Root returns the group node that is the root of this schema +func (s *Schema) Root() *GroupNode { + return s.root.(*GroupNode) +} + +// NumColumns returns the number of leaf nodes that are the actual primitive +// columns in this schema. +func (s *Schema) NumColumns() int { + return len(s.leaves) +} + +// Equals returns true as long as the leaf columns are equal, doesn't take +// into account the groups and only checks whether the schemas are compatible +// at the physical storage level. +func (s *Schema) Equals(rhs *Schema) bool { + if s.NumColumns() != rhs.NumColumns() { + return false + } + + for idx, c := range s.leaves { + if !c.Equals(rhs.Column(idx)) { + return false + } + } + return true +} + +func (s *Schema) buildTree(n Node, maxDefLvl, maxRepLvl int16, base Node) { + switch n.RepetitionType() { + case parquet.Repetitions.Repeated: + maxRepLvl++ + fallthrough + case parquet.Repetitions.Optional: + maxDefLvl++ + } + + switch n := n.(type) { + case *GroupNode: + for _, f := range n.fields { + s.buildTree(f, maxDefLvl, maxRepLvl, base) + } + case *PrimitiveNode: + s.nodeToLeaf[n] = len(s.leaves) + s.leaves = append(s.leaves, NewColumn(n, maxDefLvl, maxRepLvl)) + s.leafToBase[len(s.leaves)-1] = base + s.leafToIndex.Add(n.Path(), len(s.leaves)-1) + } +} + +// Column returns the (0-indexed) column of the provided index. +func (s *Schema) Column(i int) *Column { + return s.leaves[i] +} + +// ColumnIndexByName looks up the column by it's full dot separated +// node path. If there are multiple columns that match, it returns the first one. +// +// Returns -1 if not found. +func (s *Schema) ColumnIndexByName(nodePath string) int { + if search, ok := s.leafToIndex[nodePath]; ok { + return search[0] + } + return -1 +} + +// ColumnIndexByNode returns the index of the column represented by this node. +// +// Returns -1 if not found. +func (s *Schema) ColumnIndexByNode(n Node) int { + if search, ok := s.leafToIndex[n.Path()]; ok { + for _, idx := range search { + if n == s.Column(idx).SchemaNode() { + return idx + } + } + } + return -1 +} + +// ColumnRoot returns the root node of a given column if it is under a +// nested group node, providing that root group node. +func (s *Schema) ColumnRoot(i int) Node { + return s.leafToBase[i] +} + +// HasRepeatedFields returns true if any node in the schema has a repeated field type. +func (s *Schema) HasRepeatedFields() bool { + return s.root.(*GroupNode).HasRepeatedFields() +} + +// UpdateColumnOrders must get a slice that is the same length as the number of leaf columns +// and is used to update the schema metadata Column Orders. len(orders) must equal s.NumColumns() +func (s *Schema) UpdateColumnOrders(orders []parquet.ColumnOrder) error { + if len(orders) != s.NumColumns() { + return xerrors.New("parquet: malformed schema: not enough ColumnOrder values") + } + + visitor := schemaColumnOrderUpdater{orders, 0} + s.root.Visit(&visitor) + return nil +} + +// NewSchema constructs a new Schema object from a root group node. +// +// Any fields with a field-id of -1 will be given an appropriate field number based on their order. +func NewSchema(root *GroupNode) *Schema { + s := &Schema{ + root, + make([]*Column, 0), + make(map[*PrimitiveNode]int), + make(map[int]Node), + make(strIntMultimap), + } + + for _, f := range root.fields { + s.buildTree(f, 0, 0, f) + } + return s +} + +type schemaColumnOrderUpdater struct { + colOrders []parquet.ColumnOrder + leafCount int +} + +func (s *schemaColumnOrderUpdater) VisitPre(n Node) bool { + if n.Type() == Primitive { + leaf := n.(*PrimitiveNode) + leaf.ColumnOrder = s.colOrders[s.leafCount] + s.leafCount++ + } + return true +} + +func (s *schemaColumnOrderUpdater) VisitPost(Node) {} + +type toThriftVisitor struct { + elements []*format.SchemaElement +} + +func (t *toThriftVisitor) VisitPre(n Node) bool { + t.elements = append(t.elements, n.toThrift()) + return true +} + +func (t *toThriftVisitor) VisitPost(Node) {} + +// ToThrift converts a GroupNode to a slice of SchemaElements which is used +// for thrift serialization. +func ToThrift(schema *GroupNode) []*format.SchemaElement { + t := &toThriftVisitor{make([]*format.SchemaElement, 0)} + schema.Visit(t) + return t.elements +} + +type schemaPrinter struct { + w io.Writer + indent int + indentWidth int +} + +func (s *schemaPrinter) VisitPre(n Node) bool { + fmt.Fprint(s.w, strings.Repeat(" ", s.indent)) + if n.Type() == Group { + g := n.(*GroupNode) + fmt.Fprintf(s.w, "%s group field_id=%d %s", g.RepetitionType(), g.FieldID(), g.Name()) + _, invalid := g.logicalType.(UnknownLogicalType) + _, none := g.logicalType.(NoLogicalType) + + if g.logicalType != nil && !invalid && !none { + fmt.Fprintf(s.w, " (%s)", g.logicalType) + } else if g.convertedType != ConvertedTypes.None { + fmt.Fprintf(s.w, " (%s)", g.convertedType) + } + + fmt.Fprintln(s.w, " {") + s.indent += s.indentWidth + } else { + p := n.(*PrimitiveNode) + fmt.Fprintf(s.w, "%s %s field_id=%d %s", p.RepetitionType(), strings.ToLower(p.PhysicalType().String()), p.FieldID(), p.Name()) + _, invalid := p.logicalType.(UnknownLogicalType) + _, none := p.logicalType.(NoLogicalType) + + if p.logicalType != nil && !invalid && !none { + fmt.Fprintf(s.w, " (%s)", p.logicalType) + } else if p.convertedType == ConvertedTypes.Decimal { + fmt.Fprintf(s.w, " (%s(%d,%d))", p.convertedType, p.DecimalMetadata().Precision, p.DecimalMetadata().Scale) + } else if p.convertedType != ConvertedTypes.None { + fmt.Fprintf(s.w, " (%s)", p.convertedType) + } + fmt.Fprintln(s.w, ";") + } + return true +} + +func (s *schemaPrinter) VisitPost(n Node) { + if n.Type() == Group { + s.indent -= s.indentWidth + fmt.Fprint(s.w, strings.Repeat(" ", s.indent)) + fmt.Fprintln(s.w, "}") + } +} + +// PrintSchema writes a string representation of the tree to w using the indent +// width provided. +func PrintSchema(n Node, w io.Writer, indentWidth int) { + n.Visit(&schemaPrinter{w, 0, indentWidth}) +} + +type strIntMultimap map[string][]int + +func (f strIntMultimap) Add(key string, val int) bool { + if _, ok := f[key]; !ok { + f[key] = []int{val} + return false + } + f[key] = append(f[key], val) + return true +} |