diff options
Diffstat (limited to 'src/arrow/go/parquet/internal/encoding/decoder.go')
-rw-r--r-- | src/arrow/go/parquet/internal/encoding/decoder.go | 186 |
1 files changed, 186 insertions, 0 deletions
diff --git a/src/arrow/go/parquet/internal/encoding/decoder.go b/src/arrow/go/parquet/internal/encoding/decoder.go new file mode 100644 index 000000000..2a25d10e9 --- /dev/null +++ b/src/arrow/go/parquet/internal/encoding/decoder.go @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoding + +import ( + "bytes" + "reflect" + + "github.com/apache/arrow/go/v6/arrow/memory" + "github.com/apache/arrow/go/v6/parquet" + "github.com/apache/arrow/go/v6/parquet/internal/debug" + format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" + "github.com/apache/arrow/go/v6/parquet/internal/utils" + "github.com/apache/arrow/go/v6/parquet/schema" + "golang.org/x/xerrors" +) + +// DecoderTraits provides an interface for more easily interacting with types +// to generate decoders for specific types. +type DecoderTraits interface { + Decoder(e parquet.Encoding, descr *schema.Column, useDict bool, mem memory.Allocator) TypedDecoder + BytesRequired(int) int +} + +// NewDecoder constructs a decoder for a given type and encoding +func NewDecoder(t parquet.Type, e parquet.Encoding, descr *schema.Column, mem memory.Allocator) TypedDecoder { + traits := getDecodingTraits(t) + if traits == nil { + return nil + } + + return traits.Decoder(e, descr, false /* use dictionary */, mem) +} + +// NewDictDecoder is like NewDecoder but for dictionary encodings, panics if type is bool. +// +// if mem is nil, memory.DefaultAllocator will be used +func NewDictDecoder(t parquet.Type, descr *schema.Column, mem memory.Allocator) DictDecoder { + traits := getDecodingTraits(t) + if traits == nil { + return nil + } + + if mem == nil { + mem = memory.DefaultAllocator + } + + return traits.Decoder(parquet.Encodings.RLEDict, descr, true /* use dictionary */, mem).(DictDecoder) +} + +type decoder struct { + descr *schema.Column + encoding format.Encoding + nvals int + data []byte + typeLen int +} + +// newDecoderBase constructs the base decoding object that is embedded in the +// type specific decoders. +func newDecoderBase(e format.Encoding, descr *schema.Column) decoder { + typeLen := -1 + if descr != nil && descr.PhysicalType() == parquet.Types.FixedLenByteArray { + typeLen = int(descr.TypeLength()) + } + + return decoder{ + descr: descr, + encoding: e, + typeLen: typeLen, + } +} + +// SetData sets the data for decoding into the decoder to update the available +// data bytes and number of values available. +func (d *decoder) SetData(nvals int, data []byte) error { + d.data = data + d.nvals = nvals + return nil +} + +// ValuesLeft returns the number of remaining values that can be decoded +func (d *decoder) ValuesLeft() int { return d.nvals } + +// Encoding returns the encoding type used by this decoder to decode the bytes. +func (d *decoder) Encoding() parquet.Encoding { return parquet.Encoding(d.encoding) } + +type dictDecoder struct { + decoder + mem memory.Allocator + dictValueDecoder utils.DictionaryConverter + idxDecoder *utils.RleDecoder +} + +// SetDict sets a decoder that can be used to decode the dictionary that is +// used for this column in order to return the proper values. +func (d *dictDecoder) SetDict(dict TypedDecoder) { + if dict.Type() != d.descr.PhysicalType() { + panic("parquet: mismatch dictionary and column data type") + } + + d.dictValueDecoder = NewDictConverter(dict) +} + +// SetData sets the index value data into the decoder. +func (d *dictDecoder) SetData(nvals int, data []byte) error { + d.nvals = nvals + if len(data) == 0 { + // no data, bitwidth can safely be 0 + d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data), 0 /* bitwidth */) + return nil + } + + // grab the bit width from the first byte + width := uint8(data[0]) + if width >= 64 { + return xerrors.New("parquet: invalid or corrupted bit width") + } + + // pass the rest of the data, minus that first byte, to the decoder + d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data[1:]), int(width)) + return nil +} + +func (d *dictDecoder) decode(out interface{}) (int, error) { + return d.idxDecoder.GetBatchWithDict(d.dictValueDecoder, out) +} + +func (d *dictDecoder) decodeSpaced(out interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { + return d.idxDecoder.GetBatchWithDictSpaced(d.dictValueDecoder, out, nullCount, validBits, validBitsOffset) +} + +var empty = [1]byte{0} + +// spacedExpand is used to take a slice of data and utilize the bitmap provided to fill in nulls into the +// correct slots according to the bitmap in order to produce a fully expanded result slice with nulls +// in the correct slots. +func spacedExpand(buffer interface{}, nullCount int, validBits []byte, validBitsOffset int64) int { + bufferRef := reflect.ValueOf(buffer) + if bufferRef.Kind() != reflect.Slice { + panic("invalid spacedexpand type, not slice") + } + + var ( + numValues int = bufferRef.Len() + ) + + idxDecode := int64(numValues - nullCount) + if idxDecode == 0 { // if there's nothing to decode there's nothing to do. + return numValues + } + + // read the bitmap in reverse grabbing runs of valid bits where possible. + rdr := utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(numValues)) + for { + run := rdr.NextRun() + if run.Length == 0 { + break + } + + // copy data from the end of the slice to it's proper location in the slice after accounting for the nulls + // because we technically don't care what is in the null slots we don't actually have to clean + // up after ourselves because we're doing this in reverse to guarantee that we'll always simply + // overwrite any existing data with the correctly spaced data. Any data that happens to be left in the null + // slots is fine since it shouldn't matter and saves us work. + idxDecode -= run.Length + n := reflect.Copy(bufferRef.Slice(int(run.Pos), bufferRef.Len()), bufferRef.Slice(int(idxDecode), int(int64(idxDecode)+run.Length))) + debug.Assert(n == int(run.Length), "reflect.Copy copied incorrect number of elements in spacedExpand") + } + + return numValues +} |