// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metadata import ( "bytes" "context" "io" "reflect" "unicode/utf8" "github.com/apache/arrow/go/v6/parquet" "github.com/apache/arrow/go/v6/parquet/compress" "github.com/apache/arrow/go/v6/parquet/internal/encryption" format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" "github.com/apache/arrow/go/v6/parquet/internal/thrift" "github.com/apache/arrow/go/v6/parquet/schema" "golang.org/x/xerrors" ) // DefaultCompressionType is used unless a different compression is specified // in the properties var DefaultCompressionType = compress.Codecs.Uncompressed // FileMetaDataBuilder is a proxy for more easily constructing file metadata // particularly used when writing a file out. type FileMetaDataBuilder struct { metadata *format.FileMetaData props *parquet.WriterProperties schema *schema.Schema rowGroups []*format.RowGroup currentRgBldr *RowGroupMetaDataBuilder kvmeta KeyValueMetadata cryptoMetadata *format.FileCryptoMetaData } // NewFileMetadataBuilder will use the default writer properties if nil is passed for // the writer properties and nil is allowable for the key value metadata. func NewFileMetadataBuilder(schema *schema.Schema, props *parquet.WriterProperties, kvmeta KeyValueMetadata) *FileMetaDataBuilder { var crypto *format.FileCryptoMetaData if props.FileEncryptionProperties() != nil && props.FileEncryptionProperties().EncryptedFooter() { crypto = format.NewFileCryptoMetaData() } return &FileMetaDataBuilder{ metadata: format.NewFileMetaData(), props: props, schema: schema, kvmeta: kvmeta, cryptoMetadata: crypto, } } // GetFileCryptoMetaData returns the cryptographic information for encrypting/ // decrypting the file. func (f *FileMetaDataBuilder) GetFileCryptoMetaData() *FileCryptoMetadata { if f.cryptoMetadata == nil { return nil } props := f.props.FileEncryptionProperties() f.cryptoMetadata.EncryptionAlgorithm = props.Algorithm().ToThrift() keyMetadata := props.FooterKeyMetadata() if keyMetadata != "" { f.cryptoMetadata.KeyMetadata = []byte(keyMetadata) } return &FileCryptoMetadata{f.cryptoMetadata, 0} } // AppendRowGroup adds a rowgroup to the list and returns a builder // for that row group func (f *FileMetaDataBuilder) AppendRowGroup() *RowGroupMetaDataBuilder { if f.rowGroups == nil { f.rowGroups = make([]*format.RowGroup, 0, 1) } rg := format.NewRowGroup() f.rowGroups = append(f.rowGroups, rg) f.currentRgBldr = NewRowGroupMetaDataBuilder(f.props, f.schema, rg) return f.currentRgBldr } // Finish will finalize the metadata of the number of rows, row groups, // version etc. This will clear out this filemetadatabuilder so it can // be re-used func (f *FileMetaDataBuilder) Finish() (*FileMetaData, error) { totalRows := int64(0) for _, rg := range f.rowGroups { totalRows += rg.NumRows } f.metadata.NumRows = totalRows f.metadata.RowGroups = f.rowGroups switch f.props.Version() { case parquet.V1_0: f.metadata.Version = 1 default: f.metadata.Version = 2 } createdBy := f.props.CreatedBy() f.metadata.CreatedBy = &createdBy // Users cannot set the `ColumnOrder` since we do not not have user defined sort order // in the spec yet. // // We always default to `TYPE_DEFINED_ORDER`. We can expose it in // the API once we have user defined sort orders in the Parquet format. // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType typeDefined := format.NewTypeDefinedOrder() colOrder := &format.ColumnOrder{TYPE_ORDER: typeDefined} f.metadata.ColumnOrders = make([]*format.ColumnOrder, f.schema.NumColumns()) for idx := range f.metadata.ColumnOrders { f.metadata.ColumnOrders[idx] = colOrder } encryptProps := f.props.FileEncryptionProperties() if encryptProps != nil && !encryptProps.EncryptedFooter() { var signingAlgo parquet.Algorithm algo := encryptProps.Algorithm() signingAlgo.Aad.AadFileUnique = algo.Aad.AadFileUnique signingAlgo.Aad.SupplyAadPrefix = algo.Aad.SupplyAadPrefix if !algo.Aad.SupplyAadPrefix { signingAlgo.Aad.AadPrefix = algo.Aad.AadPrefix } signingAlgo.Algo = parquet.AesGcm f.metadata.EncryptionAlgorithm = signingAlgo.ToThrift() footerSigningMetadata := f.props.FileEncryptionProperties().FooterKeyMetadata() if footerSigningMetadata != "" { f.metadata.FooterSigningKeyMetadata = []byte(footerSigningMetadata) } } f.metadata.Schema = schema.ToThrift(f.schema.Root()) f.metadata.KeyValueMetadata = f.kvmeta out := &FileMetaData{ FileMetaData: f.metadata, version: NewAppVersion(f.metadata.GetCreatedBy()), } if err := out.initSchema(); err != nil { return nil, err } out.initColumnOrders() f.metadata = format.NewFileMetaData() f.rowGroups = nil return out, nil } // KeyValueMetadata is an alias for a slice of thrift keyvalue pairs. // // It is presumed that the metadata should all be utf8 valid. type KeyValueMetadata []*format.KeyValue // NewKeyValueMetadata is equivalent to make(KeyValueMetadata, 0) func NewKeyValueMetadata() KeyValueMetadata { return make(KeyValueMetadata, 0) } // Append adds the passed in key and value to the metadata, if either contains // any invalid utf8 runes, then it is not added and an error is returned. func (k *KeyValueMetadata) Append(key, value string) error { if !utf8.ValidString(key) || !utf8.ValidString(value) { return xerrors.Errorf("metadata must be valid utf8 strings, got key = '%s' and value = '%s'", key, value) } *k = append(*k, &format.KeyValue{Key: key, Value: &value}) return nil } func (k KeyValueMetadata) Len() int { return len(k) } // Equals compares all of the metadata keys and values to check they are equal func (k KeyValueMetadata) Equals(other KeyValueMetadata) bool { return reflect.DeepEqual(k, other) } func (k KeyValueMetadata) Keys() (ret []string) { ret = make([]string, len(k)) for idx, v := range k { ret[idx] = v.GetKey() } return } func (k KeyValueMetadata) Values() (ret []string) { ret = make([]string, len(k)) for idx, v := range k { ret[idx] = v.GetValue() } return } func (k KeyValueMetadata) FindValue(key string) *string { for _, v := range k { if v.Key == key { return v.Value } } return nil } // FileMetaData is a proxy around the underlying thrift FileMetaData object // to make it easier to use and interact with. type FileMetaData struct { *format.FileMetaData Schema *schema.Schema FileDecryptor encryption.FileDecryptor // app version of the writer for this file version *AppVersion // size of the raw bytes of the metadata in the file which were // decoded by thrift, Size() getter returns the value. metadataLen int } // NewFileMetaData takes in the raw bytes of the serialized metadata to deserialize // and will attempt to decrypt the footer if a decryptor is provided. func NewFileMetaData(data []byte, fileDecryptor encryption.FileDecryptor) (*FileMetaData, error) { meta := format.NewFileMetaData() if fileDecryptor != nil { footerDecryptor := fileDecryptor.GetFooterDecryptor() data = footerDecryptor.Decrypt(data) } remain, err := thrift.DeserializeThrift(meta, data) if err != nil { return nil, err } f := &FileMetaData{ FileMetaData: meta, version: NewAppVersion(meta.GetCreatedBy()), metadataLen: len(data) - int(remain), FileDecryptor: fileDecryptor, } f.initSchema() f.initColumnOrders() return f, nil } // Size is the length of the raw serialized metadata bytes in the footer func (f *FileMetaData) Size() int { return f.metadataLen } // NumSchemaElements is the length of the flattened schema list in the thrift func (f *FileMetaData) NumSchemaElements() int { return len(f.FileMetaData.Schema) } // RowGroup provides the metadata for the (0-based) index of the row group func (f *FileMetaData) RowGroup(i int) *RowGroupMetaData { return &RowGroupMetaData{ f.RowGroups[i], f.Schema, f.version, f.FileDecryptor, } } func (f *FileMetaData) Serialize(ctx context.Context) ([]byte, error) { return thrift.NewThriftSerializer().Write(ctx, f.FileMetaData) } func (f *FileMetaData) SerializeString(ctx context.Context) (string, error) { return thrift.NewThriftSerializer().WriteString(ctx, f.FileMetaData) } // EncryptionAlgorithm constructs the algorithm object from the thrift // information or returns an empty instance if it was not set. func (f *FileMetaData) EncryptionAlgorithm() parquet.Algorithm { if f.IsSetEncryptionAlgorithm() { return parquet.AlgorithmFromThrift(f.GetEncryptionAlgorithm()) } return parquet.Algorithm{} } func (f *FileMetaData) initSchema() error { root, err := schema.FromParquet(f.FileMetaData.Schema) if err != nil { return err } f.Schema = schema.NewSchema(root.(*schema.GroupNode)) return nil } func (f *FileMetaData) initColumnOrders() { orders := make([]parquet.ColumnOrder, 0, f.Schema.NumColumns()) if f.IsSetColumnOrders() { for _, o := range f.GetColumnOrders() { if o.IsSetTYPE_ORDER() { orders = append(orders, parquet.ColumnOrders.TypeDefinedOrder) } else { orders = append(orders, parquet.ColumnOrders.Undefined) } } } else { orders = orders[:f.Schema.NumColumns()] orders[0] = parquet.ColumnOrders.Undefined for i := 1; i < len(orders); i *= 2 { copy(orders[i:], orders[:i]) } } f.Schema.UpdateColumnOrders(orders) } // WriterVersion returns the constructed application version from the // created by string func (f *FileMetaData) WriterVersion() *AppVersion { if f.version == nil { f.version = NewAppVersion(f.GetCreatedBy()) } return f.version } // SetFilePath will set the file path into all of the columns in each row group. func (f *FileMetaData) SetFilePath(path string) { for _, rg := range f.RowGroups { for _, chunk := range rg.Columns { chunk.FilePath = &path } } } // AppendRowGroups will add all of the rowgroup metadata from other to the // current file metadata func (f *FileMetaData) AppendRowGroups(other *FileMetaData) error { if !f.Schema.Equals(other.Schema) { return xerrors.New("parquet/FileMetaData: AppendRowGroups requires equal schemas") } f.RowGroups = append(f.RowGroups, other.GetRowGroups()...) for _, rg := range other.GetRowGroups() { f.NumRows += rg.NumRows } return nil } // Subset will construct a new FileMetaData object containing only the requested // row groups by index func (f *FileMetaData) Subset(rowGroups []int) (*FileMetaData, error) { for _, i := range rowGroups { if i < len(f.RowGroups) { continue } return nil, xerrors.Errorf("parquet: this file only has %d row groups, but requested a subset including row group: %d", len(f.RowGroups), i) } out := &FileMetaData{ &format.FileMetaData{ Schema: f.FileMetaData.Schema, CreatedBy: f.CreatedBy, ColumnOrders: f.GetColumnOrders(), EncryptionAlgorithm: f.FileMetaData.EncryptionAlgorithm, FooterSigningKeyMetadata: f.FooterSigningKeyMetadata, Version: f.FileMetaData.Version, KeyValueMetadata: f.KeyValueMetadata(), }, f.Schema, f.FileDecryptor, f.version, 0, } out.RowGroups = make([]*format.RowGroup, 0, len(rowGroups)) for _, selected := range rowGroups { out.RowGroups = append(out.RowGroups, f.RowGroups[selected]) out.NumRows += f.RowGroups[selected].GetNumRows() } return out, nil } func (f *FileMetaData) Equals(other *FileMetaData) bool { return reflect.DeepEqual(f.FileMetaData, other.FileMetaData) } func (f *FileMetaData) KeyValueMetadata() KeyValueMetadata { return f.GetKeyValueMetadata() } // VerifySignature constructs a cryptographic signature using the FileDecryptor // of the footer and then verifies it's integrity. // // Panics if f.FileDecryptor is nil func (f *FileMetaData) VerifySignature(signature []byte) bool { if f.FileDecryptor == nil { panic("decryption not set propertly, cannot verify signature") } serializer := thrift.NewThriftSerializer() data, _ := serializer.Write(context.Background(), f.FileMetaData) nonce := signature[:encryption.NonceLength] tag := signature[encryption.NonceLength : encryption.NonceLength+encryption.GcmTagLength] key := f.FileDecryptor.GetFooterKey() aad := encryption.CreateFooterAad(f.FileDecryptor.FileAad()) enc := encryption.NewAesEncryptor(f.FileDecryptor.Algorithm(), true) var buf bytes.Buffer buf.Grow(enc.CiphertextSizeDelta() + len(data)) encryptedLen := enc.SignedFooterEncrypt(&buf, data, []byte(key), []byte(aad), nonce) return bytes.Equal(buf.Bytes()[encryptedLen-encryption.GcmTagLength:], tag) } // WriteTo will serialize and write out this file metadata, encrypting it if // appropriate. // // If it is an encrypted file with a plaintext footer, then we will write the // signature with the unencrypted footer. func (f *FileMetaData) WriteTo(w io.Writer, encryptor encryption.Encryptor) (int64, error) { serializer := thrift.NewThriftSerializer() // only in encrypted files with plaintext footers, the encryption algorithm is set in the footer if f.IsSetEncryptionAlgorithm() { data, err := serializer.Write(context.Background(), f.FileMetaData) if err != nil { return 0, err } // encrypt the footer key var buf bytes.Buffer buf.Grow(encryptor.CiphertextSizeDelta() + len(data)) encryptedLen := encryptor.Encrypt(&buf, data) wrote := 0 n := 0 // write unencrypted footer if n, err = w.Write(data); err != nil { return int64(n), err } wrote += n // write signature (nonce and tag) buf.Next(4) if n, err = w.Write(buf.Next(encryption.NonceLength)); err != nil { return int64(wrote + n), err } wrote += n buf.Next(encryptedLen - 4 - encryption.NonceLength - encryption.GcmTagLength) n, err = w.Write(buf.Next(encryption.GcmTagLength)) return int64(wrote + n), err } n, err := serializer.Serialize(f.FileMetaData, w, encryptor) return int64(n), err } // Version returns the "version" of the file // // WARNING: The value returned by this method is unreliable as 1) the // parquet file metadata stores the version as a single integer and // 2) some producers are known to always write a hardcoded value. Therefore // you cannot use this value to know which features are used in the file. func (f *FileMetaData) Version() parquet.Version { switch f.FileMetaData.Version { case 1: return parquet.V1_0 case 2: return parquet.V2_LATEST default: // imporperly set version, assume parquet 1.0 return parquet.V1_0 } } // FileCryptoMetadata is a proxy for the thrift fileCryptoMetadata object type FileCryptoMetadata struct { metadata *format.FileCryptoMetaData cryptoMetadataLen uint32 } // NewFileCryptoMetaData takes in the raw serialized bytes to deserialize // storing the number of bytes that were actually deserialized. func NewFileCryptoMetaData(metadata []byte) (ret FileCryptoMetadata, err error) { ret.metadata = format.NewFileCryptoMetaData() var remain uint64 remain, err = thrift.DeserializeThrift(ret.metadata, metadata) ret.cryptoMetadataLen = uint32(uint64(len(metadata)) - remain) return } // WriteTo writes out the serialized crypto metadata to w func (fc FileCryptoMetadata) WriteTo(w io.Writer) (int64, error) { serializer := thrift.NewThriftSerializer() n, err := serializer.Serialize(fc.metadata, w, nil) return int64(n), err } // Len is the number of bytes that were deserialized to create this object func (fc FileCryptoMetadata) Len() int { return int(fc.cryptoMetadataLen) } func (fc FileCryptoMetadata) KeyMetadata() []byte { return fc.metadata.KeyMetadata } // EncryptionAlgorithm constructs the object from the thrift instance of // the encryption algorithm func (fc FileCryptoMetadata) EncryptionAlgorithm() parquet.Algorithm { return parquet.AlgorithmFromThrift(fc.metadata.GetEncryptionAlgorithm()) }