// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metadata import ( "bytes" "context" "io" "reflect" "github.com/apache/arrow/go/v6/arrow/memory" "github.com/apache/arrow/go/v6/parquet" "github.com/apache/arrow/go/v6/parquet/compress" "github.com/apache/arrow/go/v6/parquet/internal/encryption" format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" "github.com/apache/arrow/go/v6/parquet/internal/thrift" "github.com/apache/arrow/go/v6/parquet/schema" "golang.org/x/xerrors" ) // PageEncodingStats is used for counting the number of pages of specific // types with the given internal encoding. type PageEncodingStats struct { Encoding parquet.Encoding PageType format.PageType } type statvalues struct { *format.Statistics } func (s *statvalues) GetMin() []byte { return s.GetMinValue() } func (s *statvalues) GetMax() []byte { return s.GetMaxValue() } func (s *statvalues) IsSetMin() bool { return s.IsSetMinValue() } func (s *statvalues) IsSetMax() bool { return s.IsSetMaxValue() } func makeColumnStats(metadata *format.ColumnMetaData, descr *schema.Column, mem memory.Allocator) TypedStatistics { if descr.ColumnOrder() == parquet.ColumnOrders.TypeDefinedOrder { return NewStatisticsFromEncoded(descr, mem, metadata.NumValues-metadata.Statistics.GetNullCount(), &statvalues{metadata.Statistics}) } return NewStatisticsFromEncoded(descr, mem, metadata.NumValues-metadata.Statistics.GetNullCount(), metadata.Statistics) } // ColumnChunkMetaData is a proxy around format.ColumnChunkMetaData // containing all of the information and metadata for a given column chunk // and it's associated Column type ColumnChunkMetaData struct { column *format.ColumnChunk columnMeta *format.ColumnMetaData decryptedMeta format.ColumnMetaData descr *schema.Column writerVersion *AppVersion encodings []parquet.Encoding encodingStats []format.PageEncodingStats possibleStats TypedStatistics mem memory.Allocator } // NewColumnChunkMetaData creates an instance of the metadata from a column chunk and descriptor // // this is primarily used internally or between the subpackages. ColumnChunkMetaDataBuilder should // be used by consumers instead of using this directly. func NewColumnChunkMetaData(column *format.ColumnChunk, descr *schema.Column, writerVersion *AppVersion, rowGroupOrdinal, columnOrdinal int16, fileDecryptor encryption.FileDecryptor) (*ColumnChunkMetaData, error) { c := &ColumnChunkMetaData{ column: column, columnMeta: column.GetMetaData(), descr: descr, writerVersion: writerVersion, mem: memory.DefaultAllocator, } if column.IsSetCryptoMetadata() { ccmd := column.CryptoMetadata if ccmd.IsSetENCRYPTION_WITH_COLUMN_KEY() { if fileDecryptor != nil && fileDecryptor.Properties() != nil { // should decrypt metadata path := parquet.ColumnPath(ccmd.ENCRYPTION_WITH_COLUMN_KEY.GetPathInSchema()) keyMetadata := ccmd.ENCRYPTION_WITH_COLUMN_KEY.GetKeyMetadata() aadColumnMetadata := encryption.CreateModuleAad(fileDecryptor.FileAad(), encryption.ColumnMetaModule, rowGroupOrdinal, columnOrdinal, -1) decryptor := fileDecryptor.GetColumnMetaDecryptor(path.String(), string(keyMetadata), aadColumnMetadata) thrift.DeserializeThrift(&c.decryptedMeta, decryptor.Decrypt(column.GetEncryptedColumnMetadata())) c.columnMeta = &c.decryptedMeta } else { return nil, xerrors.New("cannot decrypt column metadata. file decryption not setup correctly") } } } for _, enc := range c.columnMeta.Encodings { c.encodings = append(c.encodings, parquet.Encoding(enc)) } for _, enc := range c.columnMeta.EncodingStats { c.encodingStats = append(c.encodingStats, *enc) } return c, nil } // CryptoMetadata returns the cryptographic metadata for how this column was // encrypted and how to decrypt it. func (c *ColumnChunkMetaData) CryptoMetadata() *format.ColumnCryptoMetaData { return c.column.GetCryptoMetadata() } // FileOffset is the location in the file where the column data begins func (c *ColumnChunkMetaData) FileOffset() int64 { return c.column.FileOffset } // FilePath gives the name of the parquet file if provided in the metadata func (c *ColumnChunkMetaData) FilePath() string { return c.column.GetFilePath() } // Type is the physical storage type used in the parquet file for this column chunk. func (c *ColumnChunkMetaData) Type() parquet.Type { return parquet.Type(c.columnMeta.Type) } // NumValues is the number of values stored in just this chunk including nulls. func (c *ColumnChunkMetaData) NumValues() int64 { return c.columnMeta.NumValues } // PathInSchema is the full path to this column from the root of the schema including // any nested columns func (c *ColumnChunkMetaData) PathInSchema() parquet.ColumnPath { return c.columnMeta.GetPathInSchema() } // Compression provides the type of compression used for this particular chunk. func (c *ColumnChunkMetaData) Compression() compress.Compression { return compress.Compression(c.columnMeta.Codec) } // Encodings returns the list of different encodings used in this chunk func (c *ColumnChunkMetaData) Encodings() []parquet.Encoding { return c.encodings } // EncodingStats connects the order of encodings based on the list of pages and types func (c *ColumnChunkMetaData) EncodingStats() []PageEncodingStats { ret := make([]PageEncodingStats, len(c.encodingStats)) for idx := range ret { ret[idx].Encoding = parquet.Encoding(c.encodingStats[idx].Encoding) ret[idx].PageType = c.encodingStats[idx].PageType } return ret } // HasDictionaryPage returns true if there is a dictionary page offset set in // this metadata. func (c *ColumnChunkMetaData) HasDictionaryPage() bool { return c.columnMeta.IsSetDictionaryPageOffset() } // DictionaryPageOffset returns the location in the file where the dictionary page starts func (c *ColumnChunkMetaData) DictionaryPageOffset() int64 { return c.columnMeta.GetDictionaryPageOffset() } // DataPageOffset returns the location in the file where the data pages begin for this column func (c *ColumnChunkMetaData) DataPageOffset() int64 { return c.columnMeta.GetDataPageOffset() } // HasIndexPage returns true if the offset for the index page is set in the metadata func (c *ColumnChunkMetaData) HasIndexPage() bool { return c.columnMeta.IsSetIndexPageOffset() } // IndexPageOffset is the location in the file where the index page starts. func (c *ColumnChunkMetaData) IndexPageOffset() int64 { return c.columnMeta.GetIndexPageOffset() } // TotalCompressedSize will be equal to TotalUncompressedSize if the data is not compressed. // Otherwise this will be the size of the actual data in the file. func (c *ColumnChunkMetaData) TotalCompressedSize() int64 { return c.columnMeta.GetTotalCompressedSize() } // TotalUncompressedSize is the total size of the raw data after uncompressing the chunk func (c *ColumnChunkMetaData) TotalUncompressedSize() int64 { return c.columnMeta.GetTotalUncompressedSize() } // BloomFilterOffset is the byte offset from the beginning of the file to the bloom // filter data. func (c *ColumnChunkMetaData) BloomFilterOffset() int64 { return c.columnMeta.GetBloomFilterOffset() } // StatsSet returns true only if there are statistics set in the metadata and the column // descriptor has a sort order that is not SortUnknown // // It also checks the writer version to ensure that it was not written by a version // of parquet which is known to have incorrect stat computations. func (c *ColumnChunkMetaData) StatsSet() (bool, error) { if !c.columnMeta.IsSetStatistics() || c.descr.SortOrder() == schema.SortUNKNOWN { return false, nil } if c.possibleStats == nil { c.possibleStats = makeColumnStats(c.columnMeta, c.descr, c.mem) } encoded, err := c.possibleStats.Encode() if err != nil { return false, err } return c.writerVersion.HasCorrectStatistics(c.Type(), c.descr.LogicalType(), encoded, c.descr.SortOrder()), nil } func (c *ColumnChunkMetaData) Equals(other *ColumnChunkMetaData) bool { return reflect.DeepEqual(c.columnMeta, other.columnMeta) } // Statistics can return nil if there are no stats in this metadata func (c *ColumnChunkMetaData) Statistics() (TypedStatistics, error) { ok, err := c.StatsSet() if err != nil { return nil, err } if ok { return c.possibleStats, nil } return nil, nil } // ColumnChunkMetaDataBuilder is used during writing to construct metadata // for a given column chunk while writing, providing a proxy around constructing // the actual thrift object. type ColumnChunkMetaDataBuilder struct { chunk *format.ColumnChunk props *parquet.WriterProperties column *schema.Column compressedSize int64 } func NewColumnChunkMetaDataBuilder(props *parquet.WriterProperties, column *schema.Column) *ColumnChunkMetaDataBuilder { return NewColumnChunkMetaDataBuilderWithContents(props, column, format.NewColumnChunk()) } // NewColumnChunkMetaDataBuilderWithContents will construct a builder and start it with the provided // column chunk information rather than with an empty column chunk. func NewColumnChunkMetaDataBuilderWithContents(props *parquet.WriterProperties, column *schema.Column, chunk *format.ColumnChunk) *ColumnChunkMetaDataBuilder { b := &ColumnChunkMetaDataBuilder{ props: props, column: column, chunk: chunk, } b.init(chunk) return b } // Contents returns the underlying thrift ColumnChunk object so that it can be used // for constructing or duplicating column metadata func (c *ColumnChunkMetaDataBuilder) Contents() *format.ColumnChunk { return c.chunk } func (c *ColumnChunkMetaDataBuilder) init(chunk *format.ColumnChunk) { c.chunk = chunk if !c.chunk.IsSetMetaData() { c.chunk.MetaData = format.NewColumnMetaData() } c.chunk.MetaData.Type = format.Type(c.column.PhysicalType()) c.chunk.MetaData.PathInSchema = schema.ColumnPathFromNode(c.column.SchemaNode()) c.chunk.MetaData.Codec = format.CompressionCodec(c.props.CompressionFor(c.column.Path())) } func (c *ColumnChunkMetaDataBuilder) SetFilePath(val string) { c.chunk.FilePath = &val } // Descr returns the associated column descriptor for this column chunk func (c *ColumnChunkMetaDataBuilder) Descr() *schema.Column { return c.column } func (c *ColumnChunkMetaDataBuilder) TotalCompressedSize() int64 { // if this column is encrypted, after Finish is called, the MetaData // field is set to nil and we store the compressed size so return that if c.chunk.MetaData == nil { return c.compressedSize } return c.chunk.MetaData.GetTotalCompressedSize() } func (c *ColumnChunkMetaDataBuilder) SetStats(val EncodedStatistics) { c.chunk.MetaData.Statistics = val.ToThrift() } // ChunkMetaInfo is a helper struct for passing the offset and size information // for finishing the building of column chunk metadata type ChunkMetaInfo struct { NumValues int64 DictPageOffset int64 IndexPageOffset int64 DataPageOffset int64 CompressedSize int64 UncompressedSize int64 } // EncodingStats is a helper struct for passing the encoding stat information // for finishing up metadata for a column chunk. type EncodingStats struct { DictEncodingStats map[parquet.Encoding]int32 DataEncodingStats map[parquet.Encoding]int32 } // Finish finalizes the metadata with the given offsets, // flushes any compression that needs to be done, and performs // any encryption if an encryptor is provided. func (c *ColumnChunkMetaDataBuilder) Finish(info ChunkMetaInfo, hasDict, dictFallback bool, encStats EncodingStats, metaEncryptor encryption.Encryptor) error { if info.DictPageOffset > 0 { c.chunk.MetaData.DictionaryPageOffset = &info.DictPageOffset c.chunk.FileOffset = info.DictPageOffset + info.CompressedSize } else { c.chunk.FileOffset = info.DataPageOffset + info.CompressedSize } c.chunk.MetaData.NumValues = info.NumValues if info.IndexPageOffset >= 0 { c.chunk.MetaData.IndexPageOffset = &info.IndexPageOffset } c.chunk.MetaData.DataPageOffset = info.DataPageOffset c.chunk.MetaData.TotalUncompressedSize = info.UncompressedSize c.chunk.MetaData.TotalCompressedSize = info.CompressedSize // no matter the configuration, the maximum number of thrift encodings we'll // populate is going to be 3: // 1. potential dictionary index encoding // 2. page encoding // 3. RLE for repetition and definition levels // so let's preallocate a capacity of 3 but initialize the slice at 0 len const maxEncodings = 3 thriftEncodings := make([]format.Encoding, 0, maxEncodings) if hasDict { thriftEncodings = append(thriftEncodings, format.Encoding(c.props.DictionaryIndexEncoding())) if c.props.Version() == parquet.V1_0 { thriftEncodings = append(thriftEncodings, format.Encoding_PLAIN) } else { thriftEncodings = append(thriftEncodings, format.Encoding(c.props.DictionaryPageEncoding())) } } else { // no dictionary thriftEncodings = append(thriftEncodings, format.Encoding(c.props.EncodingFor(c.column.Path()))) } thriftEncodings = append(thriftEncodings, format.Encoding(parquet.Encodings.RLE)) // Only PLAIN encoding is supported for fallback in V1 // TODO(zeroshade): Use user specified encoding for V2 if dictFallback { thriftEncodings = append(thriftEncodings, format.Encoding_PLAIN) } c.chunk.MetaData.Encodings = thriftEncodings thriftEncodingStats := make([]*format.PageEncodingStats, 0, len(encStats.DictEncodingStats)+len(encStats.DataEncodingStats)) for k, v := range encStats.DictEncodingStats { thriftEncodingStats = append(thriftEncodingStats, &format.PageEncodingStats{ PageType: format.PageType_DICTIONARY_PAGE, Encoding: format.Encoding(k), Count: v, }) } for k, v := range encStats.DataEncodingStats { thriftEncodingStats = append(thriftEncodingStats, &format.PageEncodingStats{ PageType: format.PageType_DATA_PAGE, Encoding: format.Encoding(k), Count: v, }) } c.chunk.MetaData.EncodingStats = thriftEncodingStats encryptProps := c.props.ColumnEncryptionProperties(c.column.Path()) if encryptProps != nil && encryptProps.IsEncrypted() { ccmd := format.NewColumnCryptoMetaData() if encryptProps.IsEncryptedWithFooterKey() { ccmd.ENCRYPTION_WITH_FOOTER_KEY = format.NewEncryptionWithFooterKey() } else { ccmd.ENCRYPTION_WITH_COLUMN_KEY = &format.EncryptionWithColumnKey{ KeyMetadata: []byte(encryptProps.KeyMetadata()), PathInSchema: c.column.ColumnPath(), } } c.chunk.CryptoMetadata = ccmd encryptedFooter := c.props.FileEncryptionProperties().EncryptedFooter() encryptMetadata := !encryptedFooter || !encryptProps.IsEncryptedWithFooterKey() if encryptMetadata { // Serialize and encrypt ColumnMetadata separately // Thrift-serialize the ColumnMetaData structure, // encrypt it with the column key, and write to encrypted_column_metadata serializer := thrift.NewThriftSerializer() data, err := serializer.Write(context.Background(), c.chunk.MetaData) if err != nil { return err } var buf bytes.Buffer metaEncryptor.Encrypt(&buf, data) c.chunk.EncryptedColumnMetadata = buf.Bytes() if encryptedFooter { c.compressedSize = c.chunk.MetaData.GetTotalCompressedSize() c.chunk.MetaData = nil } else { // Keep redacted metadata version for old readers c.chunk.MetaData.Statistics = nil c.chunk.MetaData.EncodingStats = nil } } } return nil } // WriteTo will always return 0 as the int64 since the thrift writer library // does not return the number of bytes written, we only use the signature // of (int64, error) in order to match the standard WriteTo interfaces. func (c *ColumnChunkMetaDataBuilder) WriteTo(w io.Writer) (int64, error) { return 0, thrift.SerializeThriftStream(c.chunk, w) }