// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package parquet import ( "github.com/apache/arrow/go/v6/arrow/memory" "github.com/apache/arrow/go/v6/parquet/compress" ) // Constants for default property values used for the default reader, writer and column props. const ( // Default Buffer size used for the Reader DefaultBufSize int64 = 4096 * 4 // Default data page size limit is 1K it's not guaranteed, but we will try to // cut data pages off at this size where possible. DefaultDataPageSize int64 = 1024 * 1024 // Default is for dictionary encoding to be turned on, use WithDictionaryDefault // writer property to change that. DefaultDictionaryEnabled = true // If the dictionary reaches the size of this limitation, the writer will use // the fallback encoding (usually plain) instead of continuing to build the // dictionary index. DefaultDictionaryPageSizeLimit = DefaultDataPageSize // In order to attempt to facilitate data page size limits for writing, // data is written in batches. Increasing the batch size may improve performance // but the larger the batch size, the easier it is to overshoot the datapage limit. DefaultWriteBatchSize int64 = 1024 // Default maximum number of rows for a single row group DefaultMaxRowGroupLen int64 = 64 * 1024 * 1024 // Default is to have stats enabled for all columns, use writer properties to // change the default, or to enable/disable for specific columns. DefaultStatsEnabled = true // If the stats are larger than 4K the writer will skip writing them out anyways. DefaultMaxStatsSize int64 = 4096 DefaultCreatedBy = "parquet-go version 1.0.0" ) // ColumnProperties defines the encoding, codec, and so on for a given column. type ColumnProperties struct { Encoding Encoding Codec compress.Compression DictionaryEnabled bool StatsEnabled bool MaxStatsSize int64 CompressionLevel int } // DefaultColumnProperties returns the default properties which get utilized for writing. // // The default column properties are the following constants: // Encoding: Encodings.Plain // Codec: compress.Codecs.Uncompressed // DictionaryEnabled: DefaultDictionaryEnabled // StatsEnabled: DefaultStatsEnabled // MaxStatsSize: DefaultMaxStatsSize // CompressionLevel: compress.DefaultCompressionLevel func DefaultColumnProperties() ColumnProperties { return ColumnProperties{ Encoding: Encodings.Plain, Codec: compress.Codecs.Uncompressed, DictionaryEnabled: DefaultDictionaryEnabled, StatsEnabled: DefaultStatsEnabled, MaxStatsSize: DefaultMaxStatsSize, CompressionLevel: compress.DefaultCompressionLevel, } } type writerPropConfig struct { wr *WriterProperties encodings map[string]Encoding codecs map[string]compress.Compression compressLevel map[string]int dictEnabled map[string]bool statsEnabled map[string]bool } // WriterProperty is used as the options for building a writer properties instance type WriterProperty func(*writerPropConfig) // WithAllocator specifies the writer to use the given allocator func WithAllocator(mem memory.Allocator) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.mem = mem } } // WithDictionaryDefault sets the default value for whether to enable dictionary encoding func WithDictionaryDefault(dict bool) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.defColumnProps.DictionaryEnabled = dict } } // WithDictionaryFor allows enabling or disabling dictionary encoding for a given column path string func WithDictionaryFor(path string, dict bool) WriterProperty { return func(cfg *writerPropConfig) { cfg.dictEnabled[path] = true } } // WithDictionaryPath is like WithDictionaryFor, but takes a ColumnPath type func WithDictionaryPath(path ColumnPath, dict bool) WriterProperty { return WithDictionaryFor(path.String(), dict) } // WithDictionaryPageSizeLimit is the limit of the dictionary at which the writer // will fallback to plain encoding instead func WithDictionaryPageSizeLimit(limit int64) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.dictPagesize = limit } } // WithBatchSize specifies the number of rows to use for batch writes to columns func WithBatchSize(batch int64) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.batchSize = batch } } // WithMaxRowGroupLength specifies the number of rows as the maximum number of rows for a given row group in the writer. func WithMaxRowGroupLength(nrows int64) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.maxRowGroupLen = nrows } } // WithDataPageSize specifies the size to use for splitting data pages for column writing. func WithDataPageSize(pgsize int64) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.pageSize = pgsize } } // WithDataPageVersion specifies whether to use Version 1 or Version 2 of the DataPage spec func WithDataPageVersion(version DataPageVersion) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.dataPageVersion = version } } // WithVersion specifies which Parquet Spec version to utilize for writing. func WithVersion(version Version) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.parquetVersion = version } } // WithCreatedBy specifies the "created by" string to use for the writer func WithCreatedBy(createdby string) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.createdBy = createdby } } // WithEncoding defines the encoding that is used when we aren't using dictionary encoding. // // This is either applied if dictionary encoding is disabled, or if we fallback if the dictionary // grew too large. func WithEncoding(encoding Encoding) WriterProperty { return func(cfg *writerPropConfig) { if encoding == Encodings.PlainDict || encoding == Encodings.RLEDict { panic("parquet: can't use dictionary encoding as fallback encoding") } cfg.wr.defColumnProps.Encoding = encoding } } // WithEncodingFor is for defining the encoding only for a specific column path. This encoding will be used // if dictionary encoding is disabled for the column or if we fallback because the dictionary grew too large func WithEncodingFor(path string, encoding Encoding) WriterProperty { return func(cfg *writerPropConfig) { if encoding == Encodings.PlainDict || encoding == Encodings.RLEDict { panic("parquet: can't use dictionary encoding as fallback encoding") } cfg.encodings[path] = encoding } } // WithEncodingPath is the same as WithEncodingFor but takes a ColumnPath directly. func WithEncodingPath(path ColumnPath, encoding Encoding) WriterProperty { return WithEncodingFor(path.String(), encoding) } // WithCompression specifies the default compression type to use for column writing. func WithCompression(codec compress.Compression) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.defColumnProps.Codec = codec } } // WithCompressionFor specifies the compression type for the given column. func WithCompressionFor(path string, codec compress.Compression) WriterProperty { return func(cfg *writerPropConfig) { cfg.codecs[path] = codec } } // WithCompressionPath is the same as WithCompressionFor but takes a ColumnPath directly. func WithCompressionPath(path ColumnPath, codec compress.Compression) WriterProperty { return WithCompressionFor(path.String(), codec) } // WithMaxStatsSize sets a maximum size for the statistics before we decide not to include them. func WithMaxStatsSize(maxStatsSize int64) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.defColumnProps.MaxStatsSize = maxStatsSize } } // WithCompressionLevel specifies the default compression level for the compressor in every column. // // The provided compression level is compressor specific. The user would have to know what the available // levels are for the selected compressor. If the compressor does not allow for selecting different // compression levels, then this function will have no effect. Parquet and Arrow will not validate the // passed compression level. If no level is selected by the user or if the special compress.DefaultCompressionLevel // value is used, then parquet will select the compression level. func WithCompressionLevel(level int) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.defColumnProps.CompressionLevel = level } } // WithCompressionLevelFor is like WithCompressionLevel but only for the given column path. func WithCompressionLevelFor(path string, level int) WriterProperty { return func(cfg *writerPropConfig) { cfg.compressLevel[path] = level } } // WithCompressionLevelPath is the same as WithCompressionLevelFor but takes a ColumnPath func WithCompressionLevelPath(path ColumnPath, level int) WriterProperty { return WithCompressionLevelFor(path.String(), level) } // WithStats specifies a default for whether or not to enable column statistics. func WithStats(enabled bool) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.defColumnProps.StatsEnabled = enabled } } // WithStatsFor specifies a per column value as to enable or disable statistics in the resulting file. func WithStatsFor(path string, enabled bool) WriterProperty { return func(cfg *writerPropConfig) { cfg.statsEnabled[path] = enabled } } // WithStatsPath is the same as WithStatsFor but takes a ColumnPath func WithStatsPath(path ColumnPath, enabled bool) WriterProperty { return WithStatsFor(path.String(), enabled) } // WithEncryptionProperties specifies the file level encryption handling for writing the file. func WithEncryptionProperties(props *FileEncryptionProperties) WriterProperty { return func(cfg *writerPropConfig) { cfg.wr.encryptionProps = props } } // WriterProperties is the collection of properties to use for writing a parquet file. The values are // read only once it has been constructed. type WriterProperties struct { mem memory.Allocator dictPagesize int64 batchSize int64 maxRowGroupLen int64 pageSize int64 parquetVersion Version createdBy string dataPageVersion DataPageVersion defColumnProps ColumnProperties columnProps map[string]*ColumnProperties encryptionProps *FileEncryptionProperties } func defaultWriterProperties() *WriterProperties { return &WriterProperties{ mem: memory.DefaultAllocator, dictPagesize: DefaultDictionaryPageSizeLimit, batchSize: DefaultWriteBatchSize, maxRowGroupLen: DefaultMaxRowGroupLen, pageSize: DefaultDataPageSize, parquetVersion: V2_LATEST, dataPageVersion: DataPageV1, createdBy: DefaultCreatedBy, defColumnProps: DefaultColumnProperties(), } } // NewWriterProperties takes a list of options for building the properties. If multiple options are used which conflict // then the last option is the one which will take effect. If no WriterProperty options are provided, then the default // properties will be utilized for writing. // // The Default properties use the following constants: // Allocator: memory.DefaultAllocator // DictionaryPageSize: DefaultDictionaryPageSizeLimit // BatchSize: DefaultWriteBatchSize // MaxRowGroupLength: DefaultMaxRowGroupLen // PageSize: DefaultDataPageSize // ParquetVersion: V1 // DataPageVersion: DataPageV1 // CreatedBy: DefaultCreatedBy func NewWriterProperties(opts ...WriterProperty) *WriterProperties { cfg := writerPropConfig{ wr: defaultWriterProperties(), encodings: make(map[string]Encoding), codecs: make(map[string]compress.Compression), compressLevel: make(map[string]int), dictEnabled: make(map[string]bool), statsEnabled: make(map[string]bool), } for _, o := range opts { o(&cfg) } cfg.wr.columnProps = make(map[string]*ColumnProperties) get := func(key string) *ColumnProperties { if p, ok := cfg.wr.columnProps[key]; ok { return p } cfg.wr.columnProps[key] = new(ColumnProperties) *cfg.wr.columnProps[key] = cfg.wr.defColumnProps return cfg.wr.columnProps[key] } for key, value := range cfg.encodings { get(key).Encoding = value } for key, value := range cfg.codecs { get(key).Codec = value } for key, value := range cfg.compressLevel { get(key).CompressionLevel = value } for key, value := range cfg.dictEnabled { get(key).DictionaryEnabled = value } for key, value := range cfg.statsEnabled { get(key).StatsEnabled = value } return cfg.wr } // FileEncryptionProperties returns the current encryption properties that were // used to create the writer properties. func (w *WriterProperties) FileEncryptionProperties() *FileEncryptionProperties { return w.encryptionProps } func (w *WriterProperties) Allocator() memory.Allocator { return w.mem } func (w *WriterProperties) CreatedBy() string { return w.createdBy } func (w *WriterProperties) WriteBatchSize() int64 { return w.batchSize } func (w *WriterProperties) DataPageSize() int64 { return w.pageSize } func (w *WriterProperties) DictionaryPageSizeLimit() int64 { return w.dictPagesize } func (w *WriterProperties) Version() Version { return w.parquetVersion } func (w *WriterProperties) DataPageVersion() DataPageVersion { return w.dataPageVersion } func (w *WriterProperties) MaxRowGroupLength() int64 { return w.maxRowGroupLen } // Compression returns the default compression type that will be used for any columns that don't // have a specific compression defined. func (w *WriterProperties) Compression() compress.Compression { return w.defColumnProps.Codec } // CompressionFor will return the compression type that is specified for the given column path, or // the default compression codec if there isn't one specific to this column. func (w *WriterProperties) CompressionFor(path string) compress.Compression { if p, ok := w.columnProps[path]; ok { return p.Codec } return w.defColumnProps.Codec } //CompressionPath is the same as CompressionFor but takes a ColumnPath func (w *WriterProperties) CompressionPath(path ColumnPath) compress.Compression { return w.CompressionFor(path.String()) } // CompressionLevel returns the default compression level that will be used for any column // that doesn't have a compression level specified for it. func (w *WriterProperties) CompressionLevel() int { return w.defColumnProps.CompressionLevel } // CompressionLevelFor returns the compression level that will be utilized for the given column, // or the default compression level if the column doesn't have a specific level specified. func (w *WriterProperties) CompressionLevelFor(path string) int { if p, ok := w.columnProps[path]; ok { return p.CompressionLevel } return w.defColumnProps.CompressionLevel } // CompressionLevelPath is the same as CompressionLevelFor but takes a ColumnPath object func (w *WriterProperties) CompressionLevelPath(path ColumnPath) int { return w.CompressionLevelFor(path.String()) } // Encoding returns the default encoding that will be utilized for any columns which don't have a different value // specified. func (w *WriterProperties) Encoding() Encoding { return w.defColumnProps.Encoding } // EncodingFor returns the encoding that will be used for the given column path, or the default encoding if there // isn't one specified for this column. func (w *WriterProperties) EncodingFor(path string) Encoding { if p, ok := w.columnProps[path]; ok { return p.Encoding } return w.defColumnProps.Encoding } // EncodingPath is the same as EncodingFor but takes a ColumnPath object func (w *WriterProperties) EncodingPath(path ColumnPath) Encoding { return w.EncodingFor(path.String()) } // DictionaryIndexEncoding returns which encoding will be used for the Dictionary Index values based on the // parquet version. V1 uses PlainDict and V2 uses RLEDict func (w *WriterProperties) DictionaryIndexEncoding() Encoding { if w.parquetVersion == V1_0 { return Encodings.PlainDict } return Encodings.RLEDict } // DictionaryPageEncoding returns the encoding that will be utilized for the DictionaryPage itself based on the parquet // version. V1 uses PlainDict, v2 uses Plain func (w *WriterProperties) DictionaryPageEncoding() Encoding { if w.parquetVersion == V1_0 { return Encodings.PlainDict } return Encodings.Plain } // DictionaryEnabled returns the default value as for whether or not dictionary encoding will be utilized for columns // that aren't separately specified. func (w *WriterProperties) DictionaryEnabled() bool { return w.defColumnProps.DictionaryEnabled } // DictionaryEnabledFor returns whether or not dictionary encoding will be used for the specified column when writing // or the default value if the column was not separately specified. func (w *WriterProperties) DictionaryEnabledFor(path string) bool { if p, ok := w.columnProps[path]; ok { return p.DictionaryEnabled } return w.defColumnProps.DictionaryEnabled } // DictionaryEnabledPath is the same as DictionaryEnabledFor but takes a ColumnPath object. func (w *WriterProperties) DictionaryEnabledPath(path ColumnPath) bool { return w.DictionaryEnabledFor(path.String()) } // StatisticsEnabled returns the default value for whether or not stats are enabled to be written for columns // that aren't separately specified. func (w *WriterProperties) StatisticsEnabled() bool { return w.defColumnProps.StatsEnabled } // StatisticsEnabledFor returns whether stats will be written for the given column path, or the default value if // it wasn't separately specified. func (w *WriterProperties) StatisticsEnabledFor(path string) bool { if p, ok := w.columnProps[path]; ok { return p.StatsEnabled } return w.defColumnProps.StatsEnabled } // StatisticsEnabledPath is the same as StatisticsEnabledFor but takes a ColumnPath object. func (w *WriterProperties) StatisticsEnabledPath(path ColumnPath) bool { return w.StatisticsEnabledFor(path.String()) } // MaxStatsSize returns the default maximum size for stats func (w *WriterProperties) MaxStatsSize() int64 { return w.defColumnProps.MaxStatsSize } // MaxStatsSizeFor returns the maximum stat size for the given column path func (w *WriterProperties) MaxStatsSizeFor(path string) int64 { if p, ok := w.columnProps[path]; ok { return p.MaxStatsSize } return w.defColumnProps.MaxStatsSize } // MaxStatsSizePath is the same as MaxStatsSizeFor but takes a ColumnPath func (w *WriterProperties) MaxStatsSizePath(path ColumnPath) int64 { return w.MaxStatsSizeFor(path.String()) } // ColumnEncryptionProperties returns the specific properties for encryption that will be used for the given column path func (w *WriterProperties) ColumnEncryptionProperties(path string) *ColumnEncryptionProperties { if w.encryptionProps != nil { return w.encryptionProps.ColumnEncryptionProperties(path) } return nil }