summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/R/parquet.R
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/arrow/r/R/parquet.R585
1 files changed, 585 insertions, 0 deletions
diff --git a/src/arrow/r/R/parquet.R b/src/arrow/r/R/parquet.R
new file mode 100644
index 000000000..ee2ed57de
--- /dev/null
+++ b/src/arrow/r/R/parquet.R
@@ -0,0 +1,585 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' Read a Parquet file
+#'
+#' '[Parquet](https://parquet.apache.org/)' is a columnar storage file format.
+#' This function enables you to read Parquet files into R.
+#'
+#' @inheritParams read_feather
+#' @param props [ParquetArrowReaderProperties]
+#' @param ... Additional arguments passed to `ParquetFileReader$create()`
+#'
+#' @return A [arrow::Table][Table], or a `data.frame` if `as_data_frame` is
+#' `TRUE` (the default).
+#' @examplesIf arrow_with_parquet()
+#' tf <- tempfile()
+#' on.exit(unlink(tf))
+#' write_parquet(mtcars, tf)
+#' df <- read_parquet(tf, col_select = starts_with("d"))
+#' head(df)
+#' @export
+read_parquet <- function(file,
+ col_select = NULL,
+ as_data_frame = TRUE,
+ props = ParquetArrowReaderProperties$create(),
+ ...) {
+ if (is.string(file)) {
+ file <- make_readable_file(file)
+ on.exit(file$close())
+ }
+ reader <- ParquetFileReader$create(file, props = props, ...)
+
+ col_select <- enquo(col_select)
+ if (!quo_is_null(col_select)) {
+ # infer which columns to keep from schema
+ schema <- reader$GetSchema()
+ names <- names(schema)
+ indices <- match(vars_select(names, !!col_select), names) - 1L
+ tab <- tryCatch(
+ reader$ReadTable(indices),
+ error = read_compressed_error
+ )
+ } else {
+ # read all columns
+ tab <- tryCatch(
+ reader$ReadTable(),
+ error = read_compressed_error
+ )
+ }
+
+ if (as_data_frame) {
+ tab <- as.data.frame(tab)
+ }
+ tab
+}
+
+#' Write Parquet file to disk
+#'
+#' [Parquet](https://parquet.apache.org/) is a columnar storage file format.
+#' This function enables you to write Parquet files from R.
+#'
+#' Due to features of the format, Parquet files cannot be appended to.
+#' If you want to use the Parquet format but also want the ability to extend
+#' your dataset, you can write to additional Parquet files and then treat
+#' the whole directory of files as a [Dataset] you can query.
+#' See `vignette("dataset", package = "arrow")` for examples of this.
+#'
+#' @param x `data.frame`, [RecordBatch], or [Table]
+#' @param sink A string file path, URI, or [OutputStream], or path in a file
+#' system (`SubTreeFileSystem`)
+#' @param chunk_size chunk size in number of rows. If NULL, the total number of rows is used.
+#' @param version parquet version, "1.0" or "2.0". Default "1.0". Numeric values
+#' are coerced to character.
+#' @param compression compression algorithm. Default "snappy". See details.
+#' @param compression_level compression level. Meaning depends on compression algorithm
+#' @param use_dictionary Specify if we should use dictionary encoding. Default `TRUE`
+#' @param write_statistics Specify if we should write statistics. Default `TRUE`
+#' @param data_page_size Set a target threshold for the approximate encoded
+#' size of data pages within a column chunk (in bytes). Default 1 MiB.
+#' @param use_deprecated_int96_timestamps Write timestamps to INT96 Parquet format. Default `FALSE`.
+#' @param coerce_timestamps Cast timestamps a particular resolution. Can be
+#' `NULL`, "ms" or "us". Default `NULL` (no casting)
+#' @param allow_truncated_timestamps Allow loss of data when coercing timestamps to a
+#' particular resolution. E.g. if microsecond or nanosecond data is lost when coercing
+#' to "ms", do not raise an exception
+#' @param properties A `ParquetWriterProperties` object, used instead of the options
+#' enumerated in this function's signature. Providing `properties` as an argument
+#' is deprecated; if you need to assemble `ParquetWriterProperties` outside
+#' of `write_parquet()`, use `ParquetFileWriter` instead.
+#' @param arrow_properties A `ParquetArrowWriterProperties` object. Like
+#' `properties`, this argument is deprecated.
+#'
+#' @details The parameters `compression`, `compression_level`, `use_dictionary` and
+#' `write_statistics` support various patterns:
+#'
+#' - The default `NULL` leaves the parameter unspecified, and the C++ library
+#' uses an appropriate default for each column (defaults listed above)
+#' - A single, unnamed, value (e.g. a single string for `compression`) applies to all columns
+#' - An unnamed vector, of the same size as the number of columns, to specify a
+#' value for each column, in positional order
+#' - A named vector, to specify the value for the named columns, the default
+#' value for the setting is used when not supplied
+#'
+#' The `compression` argument can be any of the following (case insensitive):
+#' "uncompressed", "snappy", "gzip", "brotli", "zstd", "lz4", "lzo" or "bz2".
+#' Only "uncompressed" is guaranteed to be available, but "snappy" and "gzip"
+#' are almost always included. See [codec_is_available()].
+#' The default "snappy" is used if available, otherwise "uncompressed". To
+#' disable compression, set `compression = "uncompressed"`.
+#' Note that "uncompressed" columns may still have dictionary encoding.
+#'
+#' @return the input `x` invisibly.
+#'
+#' @examplesIf arrow_with_parquet()
+#' tf1 <- tempfile(fileext = ".parquet")
+#' write_parquet(data.frame(x = 1:5), tf1)
+#'
+#' # using compression
+#' if (codec_is_available("gzip")) {
+#' tf2 <- tempfile(fileext = ".gz.parquet")
+#' write_parquet(data.frame(x = 1:5), tf2, compression = "gzip", compression_level = 5)
+#' }
+#' @export
+write_parquet <- function(x,
+ sink,
+ chunk_size = NULL,
+ # writer properties
+ version = NULL,
+ compression = default_parquet_compression(),
+ compression_level = NULL,
+ use_dictionary = NULL,
+ write_statistics = NULL,
+ data_page_size = NULL,
+ # arrow writer properties
+ use_deprecated_int96_timestamps = FALSE,
+ coerce_timestamps = NULL,
+ allow_truncated_timestamps = FALSE,
+ properties = NULL,
+ arrow_properties = NULL) {
+ x_out <- x
+
+ if (is.data.frame(x) || inherits(x, "RecordBatch")) {
+ x <- Table$create(x)
+ }
+
+ assert_that(is_writable_table(x))
+
+ if (!inherits(sink, "OutputStream")) {
+ sink <- make_output_stream(sink)
+ on.exit(sink$close())
+ }
+
+ # Deprecation warnings
+ if (!is.null(properties)) {
+ warning(
+ "Providing 'properties' is deprecated. If you need to assemble properties outside ",
+ "this function, use ParquetFileWriter instead."
+ )
+ }
+ if (!is.null(arrow_properties)) {
+ warning(
+ "Providing 'arrow_properties' is deprecated. If you need to assemble arrow_properties ",
+ "outside this function, use ParquetFileWriter instead."
+ )
+ }
+
+ writer <- ParquetFileWriter$create(
+ x$schema,
+ sink,
+ properties = properties %||% ParquetWriterProperties$create(
+ x,
+ version = version,
+ compression = compression,
+ compression_level = compression_level,
+ use_dictionary = use_dictionary,
+ write_statistics = write_statistics,
+ data_page_size = data_page_size
+ ),
+ arrow_properties = arrow_properties %||% ParquetArrowWriterProperties$create(
+ use_deprecated_int96_timestamps = use_deprecated_int96_timestamps,
+ coerce_timestamps = coerce_timestamps,
+ allow_truncated_timestamps = allow_truncated_timestamps
+ )
+ )
+ writer$WriteTable(x, chunk_size = chunk_size %||% x$num_rows)
+ writer$Close()
+
+ invisible(x_out)
+}
+
+default_parquet_compression <- function() {
+ # Match the pyarrow default (overriding the C++ default)
+ if (codec_is_available("snappy")) {
+ "snappy"
+ } else {
+ NULL
+ }
+}
+
+ParquetArrowWriterProperties <- R6Class("ParquetArrowWriterProperties", inherit = ArrowObject)
+ParquetArrowWriterProperties$create <- function(use_deprecated_int96_timestamps = FALSE,
+ coerce_timestamps = NULL,
+ allow_truncated_timestamps = FALSE,
+ ...) {
+ if (is.null(coerce_timestamps)) {
+ timestamp_unit <- -1L # null sentinel value
+ } else {
+ timestamp_unit <- make_valid_time_unit(
+ coerce_timestamps,
+ c("ms" = TimeUnit$MILLI, "us" = TimeUnit$MICRO)
+ )
+ }
+ parquet___ArrowWriterProperties___create(
+ use_deprecated_int96_timestamps = isTRUE(use_deprecated_int96_timestamps),
+ timestamp_unit = timestamp_unit,
+ allow_truncated_timestamps = isTRUE(allow_truncated_timestamps)
+ )
+}
+
+valid_parquet_version <- c(
+ "1.0" = ParquetVersionType$PARQUET_1_0,
+ "2.0" = ParquetVersionType$PARQUET_2_0
+)
+
+make_valid_version <- function(version, valid_versions = valid_parquet_version) {
+ if (is_integerish(version)) {
+ version <- as.character(version)
+ }
+ tryCatch(
+ valid_versions[[match.arg(version, choices = names(valid_versions))]],
+ error = function(cond) {
+ stop('"version" should be one of ', oxford_paste(names(valid_versions), "or"), call. = FALSE)
+ }
+ )
+}
+
+#' @title ParquetWriterProperties class
+#' @rdname ParquetWriterProperties
+#' @name ParquetWriterProperties
+#' @docType class
+#' @usage NULL
+#' @format NULL
+#' @description This class holds settings to control how a Parquet file is read
+#' by [ParquetFileWriter].
+#'
+#' @section Factory:
+#'
+#' The `ParquetWriterProperties$create()` factory method instantiates the object
+#' and takes the following arguments:
+#'
+#' - `table`: table to write (required)
+#' - `version`: Parquet version, "1.0" or "2.0". Default "1.0"
+#' - `compression`: Compression type, algorithm `"uncompressed"`
+#' - `compression_level`: Compression level; meaning depends on compression algorithm
+#' - `use_dictionary`: Specify if we should use dictionary encoding. Default `TRUE`
+#' - `write_statistics`: Specify if we should write statistics. Default `TRUE`
+#' - `data_page_size`: Set a target threshold for the approximate encoded
+#' size of data pages within a column chunk (in bytes). Default 1 MiB.
+#'
+#' @details The parameters `compression`, `compression_level`, `use_dictionary`
+#' and write_statistics` support various patterns:
+#'
+#' - The default `NULL` leaves the parameter unspecified, and the C++ library
+#' uses an appropriate default for each column (defaults listed above)
+#' - A single, unnamed, value (e.g. a single string for `compression`) applies to all columns
+#' - An unnamed vector, of the same size as the number of columns, to specify a
+#' value for each column, in positional order
+#' - A named vector, to specify the value for the named columns, the default
+#' value for the setting is used when not supplied
+#'
+#' Unlike the high-level [write_parquet], `ParquetWriterProperties` arguments
+#' use the C++ defaults. Currently this means "uncompressed" rather than
+#' "snappy" for the `compression` argument.
+#'
+#' @seealso [write_parquet]
+#' @seealso [Schema] for information about schemas and metadata handling.
+#'
+#' @export
+ParquetWriterProperties <- R6Class("ParquetWriterProperties", inherit = ArrowObject)
+ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
+ inherit = ArrowObject,
+ public = list(
+ set_version = function(version) {
+ parquet___WriterProperties___Builder__version(self, make_valid_version(version))
+ },
+ set_compression = function(table, compression) {
+ compression <- compression_from_name(compression)
+ assert_that(is.integer(compression))
+ private$.set(
+ table, compression,
+ parquet___ArrowWriterProperties___Builder__set_compressions
+ )
+ },
+ set_compression_level = function(table, compression_level) {
+ # cast to integer but keep names
+ compression_level <- set_names(as.integer(compression_level), names(compression_level))
+ private$.set(
+ table, compression_level,
+ parquet___ArrowWriterProperties___Builder__set_compression_levels
+ )
+ },
+ set_dictionary = function(table, use_dictionary) {
+ assert_that(is.logical(use_dictionary))
+ private$.set(
+ table, use_dictionary,
+ parquet___ArrowWriterProperties___Builder__set_use_dictionary
+ )
+ },
+ set_write_statistics = function(table, write_statistics) {
+ assert_that(is.logical(write_statistics))
+ private$.set(
+ table, write_statistics,
+ parquet___ArrowWriterProperties___Builder__set_write_statistics
+ )
+ },
+ set_data_page_size = function(data_page_size) {
+ parquet___ArrowWriterProperties___Builder__data_page_size(self, data_page_size)
+ }
+ ),
+ private = list(
+ .set = function(table, value, FUN) {
+ msg <- paste0("unsupported ", substitute(value), "= specification")
+ column_names <- names(table)
+ given_names <- names(value)
+ if (is.null(given_names)) {
+ if (length(value) %in% c(1L, length(column_names))) {
+ # If there's a single, unnamed value, FUN will set it globally
+ # If there are values for all columns, send them along with the names
+ FUN(self, column_names, value)
+ } else {
+ abort(msg)
+ }
+ } else if (all(given_names %in% column_names)) {
+ # Use the given names
+ FUN(self, given_names, value)
+ } else {
+ abort(msg)
+ }
+ }
+ )
+)
+
+ParquetWriterProperties$create <- function(table,
+ version = NULL,
+ compression = default_parquet_compression(),
+ compression_level = NULL,
+ use_dictionary = NULL,
+ write_statistics = NULL,
+ data_page_size = NULL,
+ ...) {
+ builder <- parquet___WriterProperties___Builder__create()
+ if (!is.null(version)) {
+ builder$set_version(version)
+ }
+ if (!is.null(compression)) {
+ builder$set_compression(table, compression = compression)
+ }
+ if (!is.null(compression_level)) {
+ builder$set_compression_level(table, compression_level = compression_level)
+ }
+ if (!is.null(use_dictionary)) {
+ builder$set_dictionary(table, use_dictionary)
+ }
+ if (!is.null(write_statistics)) {
+ builder$set_write_statistics(table, write_statistics)
+ }
+ if (!is.null(data_page_size)) {
+ builder$set_data_page_size(data_page_size)
+ }
+ parquet___WriterProperties___Builder__build(builder)
+}
+
+#' @title ParquetFileWriter class
+#' @rdname ParquetFileWriter
+#' @name ParquetFileWriter
+#' @docType class
+#' @usage NULL
+#' @format NULL
+#' @description This class enables you to interact with Parquet files.
+#'
+#' @section Factory:
+#'
+#' The `ParquetFileWriter$create()` factory method instantiates the object and
+#' takes the following arguments:
+#'
+#' - `schema` A [Schema]
+#' - `sink` An [arrow::io::OutputStream][OutputStream]
+#' - `properties` An instance of [ParquetWriterProperties]
+#' - `arrow_properties` An instance of `ParquetArrowWriterProperties`
+#'
+#' @section Methods:
+#'
+#' - `WriteTable` Write a [Table] to `sink`
+#' - `Close` Close the writer. Note: does not close the `sink`.
+#' [arrow::io::OutputStream][OutputStream] has its own `close()` method.
+#'
+#' @export
+#' @include arrow-package.R
+ParquetFileWriter <- R6Class("ParquetFileWriter",
+ inherit = ArrowObject,
+ public = list(
+ WriteTable = function(table, chunk_size) {
+ parquet___arrow___FileWriter__WriteTable(self, table, chunk_size)
+ },
+ Close = function() parquet___arrow___FileWriter__Close(self)
+ )
+)
+ParquetFileWriter$create <- function(schema,
+ sink,
+ properties = ParquetWriterProperties$create(),
+ arrow_properties = ParquetArrowWriterProperties$create()) {
+ assert_is(sink, "OutputStream")
+ parquet___arrow___ParquetFileWriter__Open(schema, sink, properties, arrow_properties)
+}
+
+
+#' @title ParquetFileReader class
+#' @rdname ParquetFileReader
+#' @name ParquetFileReader
+#' @docType class
+#' @usage NULL
+#' @format NULL
+#' @description This class enables you to interact with Parquet files.
+#'
+#' @section Factory:
+#'
+#' The `ParquetFileReader$create()` factory method instantiates the object and
+#' takes the following arguments:
+#'
+#' - `file` A character file name, raw vector, or Arrow file connection object
+#' (e.g. `RandomAccessFile`).
+#' - `props` Optional [ParquetArrowReaderProperties]
+#' - `mmap` Logical: whether to memory-map the file (default `TRUE`)
+#' - `...` Additional arguments, currently ignored
+#'
+#' @section Methods:
+#'
+#' - `$ReadTable(column_indices)`: get an `arrow::Table` from the file. The optional
+#' `column_indices=` argument is a 0-based integer vector indicating which columns to retain.
+#' - `$ReadRowGroup(i, column_indices)`: get an `arrow::Table` by reading the `i`th row group (0-based).
+#' The optional `column_indices=` argument is a 0-based integer vector indicating which columns to retain.
+#' - `$ReadRowGroups(row_groups, column_indices)`: get an `arrow::Table` by reading several row
+#' groups (0-based integers).
+#' The optional `column_indices=` argument is a 0-based integer vector indicating which columns to retain.
+#' - `$GetSchema()`: get the `arrow::Schema` of the data in the file
+#' - `$ReadColumn(i)`: read the `i`th column (0-based) as a [ChunkedArray].
+#'
+#' @section Active bindings:
+#'
+#' - `$num_rows`: number of rows.
+#' - `$num_columns`: number of columns.
+#' - `$num_row_groups`: number of row groups.
+#'
+#' @export
+#' @examplesIf arrow_with_parquet()
+#' f <- system.file("v0.7.1.parquet", package = "arrow")
+#' pq <- ParquetFileReader$create(f)
+#' pq$GetSchema()
+#' if (codec_is_available("snappy")) {
+#' # This file has compressed data columns
+#' tab <- pq$ReadTable()
+#' tab$schema
+#' }
+#' @include arrow-package.R
+ParquetFileReader <- R6Class("ParquetFileReader",
+ inherit = ArrowObject,
+ active = list(
+ num_rows = function() {
+ as.integer(parquet___arrow___FileReader__num_rows(self))
+ },
+ num_columns = function() {
+ parquet___arrow___FileReader__num_columns(self)
+ },
+ num_row_groups = function() {
+ parquet___arrow___FileReader__num_row_groups(self)
+ }
+ ),
+ public = list(
+ ReadTable = function(column_indices = NULL) {
+ if (is.null(column_indices)) {
+ parquet___arrow___FileReader__ReadTable1(self)
+ } else {
+ column_indices <- vec_cast(column_indices, integer())
+ parquet___arrow___FileReader__ReadTable2(self, column_indices)
+ }
+ },
+ ReadRowGroup = function(i, column_indices = NULL) {
+ i <- vec_cast(i, integer())
+ if (is.null(column_indices)) {
+ parquet___arrow___FileReader__ReadRowGroup1(self, i)
+ } else {
+ column_indices <- vec_cast(column_indices, integer())
+ parquet___arrow___FileReader__ReadRowGroup2(self, i, column_indices)
+ }
+ },
+ ReadRowGroups = function(row_groups, column_indices = NULL) {
+ row_groups <- vec_cast(row_groups, integer())
+ if (is.null(column_indices)) {
+ parquet___arrow___FileReader__ReadRowGroups1(self, row_groups)
+ } else {
+ column_indices <- vec_cast(column_indices, integer())
+ parquet___arrow___FileReader__ReadRowGroups2(self, row_groups, column_indices)
+ }
+ },
+ ReadColumn = function(i) {
+ i <- vec_cast(i, integer())
+ parquet___arrow___FileReader__ReadColumn(self, i)
+ },
+ GetSchema = function() {
+ parquet___arrow___FileReader__GetSchema(self)
+ }
+ )
+)
+
+ParquetFileReader$create <- function(file,
+ props = ParquetArrowReaderProperties$create(),
+ mmap = TRUE,
+ ...) {
+ file <- make_readable_file(file, mmap)
+ assert_is(props, "ParquetArrowReaderProperties")
+
+ parquet___arrow___FileReader__OpenFile(file, props)
+}
+
+#' @title ParquetArrowReaderProperties class
+#' @rdname ParquetArrowReaderProperties
+#' @name ParquetArrowReaderProperties
+#' @docType class
+#' @usage NULL
+#' @format NULL
+#' @description This class holds settings to control how a Parquet file is read
+#' by [ParquetFileReader].
+#'
+#' @section Factory:
+#'
+#' The `ParquetArrowReaderProperties$create()` factory method instantiates the object
+#' and takes the following arguments:
+#'
+#' - `use_threads` Logical: whether to use multithreading (default `TRUE`)
+#'
+#' @section Methods:
+#'
+#' - `$read_dictionary(column_index)`
+#' - `$set_read_dictionary(column_index, read_dict)`
+#' - `$use_threads(use_threads)`
+#'
+#' @export
+ParquetArrowReaderProperties <- R6Class("ParquetArrowReaderProperties",
+ inherit = ArrowObject,
+ public = list(
+ read_dictionary = function(column_index) {
+ parquet___arrow___ArrowReaderProperties__get_read_dictionary(self, column_index)
+ },
+ set_read_dictionary = function(column_index, read_dict) {
+ parquet___arrow___ArrowReaderProperties__set_read_dictionary(self, column_index, read_dict)
+ }
+ ),
+ active = list(
+ use_threads = function(use_threads) {
+ if (missing(use_threads)) {
+ parquet___arrow___ArrowReaderProperties__get_use_threads(self)
+ } else {
+ parquet___arrow___ArrowReaderProperties__set_use_threads(self, use_threads)
+ }
+ }
+ )
+)
+
+ParquetArrowReaderProperties$create <- function(use_threads = option_use_threads()) {
+ parquet___arrow___ArrowReaderProperties__Make(isTRUE(use_threads))
+}