diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/r/R/csv.R | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/r/R/csv.R')
-rw-r--r-- | src/arrow/r/R/csv.R | 644 |
1 files changed, 644 insertions, 0 deletions
diff --git a/src/arrow/r/R/csv.R b/src/arrow/r/R/csv.R new file mode 100644 index 000000000..ee890578f --- /dev/null +++ b/src/arrow/r/R/csv.R @@ -0,0 +1,644 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' Read a CSV or other delimited file with Arrow +#' +#' These functions uses the Arrow C++ CSV reader to read into a `data.frame`. +#' Arrow C++ options have been mapped to argument names that follow those of +#' `readr::read_delim()`, and `col_select` was inspired by `vroom::vroom()`. +#' +#' `read_csv_arrow()` and `read_tsv_arrow()` are wrappers around +#' `read_delim_arrow()` that specify a delimiter. +#' +#' Note that not all `readr` options are currently implemented here. Please file +#' an issue if you encounter one that `arrow` should support. +#' +#' If you need to control Arrow-specific reader parameters that don't have an +#' equivalent in `readr::read_csv()`, you can either provide them in the +#' `parse_options`, `convert_options`, or `read_options` arguments, or you can +#' use [CsvTableReader] directly for lower-level access. +#' +#' @section Specifying column types and names: +#' +#' By default, the CSV reader will infer the column names and data types from the file, but there +#' are a few ways you can specify them directly. +#' +#' One way is to provide an Arrow [Schema] in the `schema` argument, +#' which is an ordered map of column name to type. +#' When provided, it satisfies both the `col_names` and `col_types` arguments. +#' This is good if you know all of this information up front. +#' +#' You can also pass a `Schema` to the `col_types` argument. If you do this, +#' column names will still be inferred from the file unless you also specify +#' `col_names`. In either case, the column names in the `Schema` must match the +#' data's column names, whether they are explicitly provided or inferred. That +#' said, this `Schema` does not have to reference all columns: those omitted +#' will have their types inferred. +#' +#' Alternatively, you can declare column types by providing the compact string representation +#' that `readr` uses to the `col_types` argument. This means you provide a +#' single string, one character per column, where the characters map to Arrow +#' types analogously to the `readr` type mapping: +#' +#' * "c": `utf8()` +#' * "i": `int32()` +#' * "n": `float64()` +#' * "d": `float64()` +#' * "l": `bool()` +#' * "f": `dictionary()` +#' * "D": `date32()` +#' * "T": `timestamp()` +#' * "t": `time32()` +#' * "_": `null()` +#' * "-": `null()` +#' * "?": infer the type from the data +#' +#' If you use the compact string representation for `col_types`, you must also +#' specify `col_names`. +#' +#' Regardless of how types are specified, all columns with a `null()` type will +#' be dropped. +#' +#' Note that if you are specifying column names, whether by `schema` or +#' `col_names`, and the CSV file has a header row that would otherwise be used +#' to idenfity column names, you'll need to add `skip = 1` to skip that row. +#' +#' @param file A character file name or URI, `raw` vector, an Arrow input stream, +#' or a `FileSystem` with path (`SubTreeFileSystem`). +#' If a file name, a memory-mapped Arrow [InputStream] will be opened and +#' closed when finished; compression will be detected from the file extension +#' and handled automatically. If an input stream is provided, it will be left +#' open. +#' @param delim Single character used to separate fields within a record. +#' @param quote Single character used to quote strings. +#' @param escape_double Does the file escape quotes by doubling them? +#' i.e. If this option is `TRUE`, the value `""""` represents +#' a single quote, `\"`. +#' @param escape_backslash Does the file use backslashes to escape special +#' characters? This is more general than `escape_double` as backslashes +#' can be used to escape the delimiter character, the quote character, or +#' to add special characters like `\\n`. +#' @param schema [Schema] that describes the table. If provided, it will be +#' used to satisfy both `col_names` and `col_types`. +#' @param col_names If `TRUE`, the first row of the input will be used as the +#' column names and will not be included in the data frame. If `FALSE`, column +#' names will be generated by Arrow, starting with "f0", "f1", ..., "fN". +#' Alternatively, you can specify a character vector of column names. +#' @param col_types A compact string representation of the column types, or +#' `NULL` (the default) to infer types from the data. +#' @param col_select A character vector of column names to keep, as in the +#' "select" argument to `data.table::fread()`, or a +#' [tidy selection specification][tidyselect::vars_select()] +#' of columns, as used in `dplyr::select()`. +#' @param na A character vector of strings to interpret as missing values. +#' @param quoted_na Should missing values inside quotes be treated as missing +#' values (the default) or strings. (Note that this is different from the +#' the Arrow C++ default for the corresponding convert option, +#' `strings_can_be_null`.) +#' @param skip_empty_rows Should blank rows be ignored altogether? If +#' `TRUE`, blank rows will not be represented at all. If `FALSE`, they will be +#' filled with missings. +#' @param skip Number of lines to skip before reading data. +#' @param timestamp_parsers User-defined timestamp parsers. If more than one +#' parser is specified, the CSV conversion logic will try parsing values +#' starting from the beginning of this vector. Possible values are: +#' - `NULL`: the default, which uses the ISO-8601 parser +#' - a character vector of [strptime][base::strptime()] parse strings +#' - a list of [TimestampParser] objects +#' @param parse_options see [file reader options][CsvReadOptions]. +#' If given, this overrides any +#' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.). +#' @param convert_options see [file reader options][CsvReadOptions] +#' @param read_options see [file reader options][CsvReadOptions] +#' @param as_data_frame Should the function return a `data.frame` (default) or +#' an Arrow [Table]? +#' +#' @return A `data.frame`, or a Table if `as_data_frame = FALSE`. +#' @export +#' @examplesIf arrow_available() +#' tf <- tempfile() +#' on.exit(unlink(tf)) +#' write.csv(mtcars, file = tf) +#' df <- read_csv_arrow(tf) +#' dim(df) +#' # Can select columns +#' df <- read_csv_arrow(tf, col_select = starts_with("d")) +read_delim_arrow <- function(file, + delim = ",", + quote = '"', + escape_double = TRUE, + escape_backslash = FALSE, + schema = NULL, + col_names = TRUE, + col_types = NULL, + col_select = NULL, + na = c("", "NA"), + quoted_na = TRUE, + skip_empty_rows = TRUE, + skip = 0L, + parse_options = NULL, + convert_options = NULL, + read_options = NULL, + as_data_frame = TRUE, + timestamp_parsers = NULL) { + if (inherits(schema, "Schema")) { + col_names <- names(schema) + col_types <- schema + } + if (is.null(parse_options)) { + parse_options <- readr_to_csv_parse_options( + delim, + quote, + escape_double, + escape_backslash, + skip_empty_rows + ) + } + if (is.null(read_options)) { + read_options <- readr_to_csv_read_options(skip, col_names) + } + if (is.null(convert_options)) { + convert_options <- readr_to_csv_convert_options( + na, + quoted_na, + col_types = col_types, + col_names = read_options$column_names, + timestamp_parsers = timestamp_parsers + ) + } + + if (!inherits(file, "InputStream")) { + file <- make_readable_file(file) + on.exit(file$close()) + } + reader <- CsvTableReader$create( + file, + read_options = read_options, + parse_options = parse_options, + convert_options = convert_options + ) + + tab <- reader$Read() + + # TODO: move this into convert_options using include_columns + col_select <- enquo(col_select) + if (!quo_is_null(col_select)) { + tab <- tab[vars_select(names(tab), !!col_select)] + } + + if (isTRUE(as_data_frame)) { + tab <- as.data.frame(tab) + } + + tab +} + +#' @rdname read_delim_arrow +#' @export +read_csv_arrow <- function(file, + quote = '"', + escape_double = TRUE, + escape_backslash = FALSE, + schema = NULL, + col_names = TRUE, + col_types = NULL, + col_select = NULL, + na = c("", "NA"), + quoted_na = TRUE, + skip_empty_rows = TRUE, + skip = 0L, + parse_options = NULL, + convert_options = NULL, + read_options = NULL, + as_data_frame = TRUE, + timestamp_parsers = NULL) { + mc <- match.call() + mc$delim <- "," + mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow")) + eval.parent(mc) +} + +#' @rdname read_delim_arrow +#' @export +read_tsv_arrow <- function(file, + quote = '"', + escape_double = TRUE, + escape_backslash = FALSE, + schema = NULL, + col_names = TRUE, + col_types = NULL, + col_select = NULL, + na = c("", "NA"), + quoted_na = TRUE, + skip_empty_rows = TRUE, + skip = 0L, + parse_options = NULL, + convert_options = NULL, + read_options = NULL, + as_data_frame = TRUE, + timestamp_parsers = NULL) { + mc <- match.call() + mc$delim <- "\t" + mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow")) + eval.parent(mc) +} + +#' @title Arrow CSV and JSON table reader classes +#' @rdname CsvTableReader +#' @name CsvTableReader +#' @docType class +#' @usage NULL +#' @format NULL +#' @description `CsvTableReader` and `JsonTableReader` wrap the Arrow C++ CSV +#' and JSON table readers. See their usage in [read_csv_arrow()] and +#' [read_json_arrow()], respectively. +#' +#' @section Factory: +#' +#' The `CsvTableReader$create()` and `JsonTableReader$create()` factory methods +#' take the following arguments: +#' +#' - `file` An Arrow [InputStream] +#' - `convert_options` (CSV only), `parse_options`, `read_options`: see +#' [CsvReadOptions] +#' - `...` additional parameters. +#' +#' @section Methods: +#' +#' - `$Read()`: returns an Arrow Table. +#' +#' @include arrow-package.R +#' @export +CsvTableReader <- R6Class("CsvTableReader", + inherit = ArrowObject, + public = list( + Read = function() csv___TableReader__Read(self) + ) +) +CsvTableReader$create <- function(file, + read_options = CsvReadOptions$create(), + parse_options = CsvParseOptions$create(), + convert_options = CsvConvertOptions$create(), + ...) { + assert_is(file, "InputStream") + csv___TableReader__Make(file, read_options, parse_options, convert_options) +} + +#' @title File reader options +#' @rdname CsvReadOptions +#' @name CsvReadOptions +#' @docType class +#' @usage NULL +#' @format NULL +#' @description `CsvReadOptions`, `CsvParseOptions`, `CsvConvertOptions`, +#' `JsonReadOptions`, `JsonParseOptions`, and `TimestampParser` are containers for various +#' file reading options. See their usage in [read_csv_arrow()] and +#' [read_json_arrow()], respectively. +#' +#' @section Factory: +#' +#' The `CsvReadOptions$create()` and `JsonReadOptions$create()` factory methods +#' take the following arguments: +#' +#' - `use_threads` Whether to use the global CPU thread pool +#' - `block_size` Block size we request from the IO layer; also determines +#' the size of chunks when use_threads is `TRUE`. NB: if `FALSE`, JSON input +#' must end with an empty line. +#' +#' `CsvReadOptions$create()` further accepts these additional arguments: +#' +#' - `skip_rows` Number of lines to skip before reading data (default 0) +#' - `column_names` Character vector to supply column names. If length-0 +#' (the default), the first non-skipped row will be parsed to generate column +#' names, unless `autogenerate_column_names` is `TRUE`. +#' - `autogenerate_column_names` Logical: generate column names instead of +#' using the first non-skipped row (the default)? If `TRUE`, column names will +#' be "f0", "f1", ..., "fN". +#' +#' `CsvParseOptions$create()` takes the following arguments: +#' +#' - `delimiter` Field delimiting character (default `","`) +#' - `quoting` Logical: are strings quoted? (default `TRUE`) +#' - `quote_char` Quoting character, if `quoting` is `TRUE` +#' - `double_quote` Logical: are quotes inside values double-quoted? (default `TRUE`) +#' - `escaping` Logical: whether escaping is used (default `FALSE`) +#' - `escape_char` Escaping character, if `escaping` is `TRUE` +#' - `newlines_in_values` Logical: are values allowed to contain CR (`0x0d`) +#' and LF (`0x0a`) characters? (default `FALSE`) +#' - `ignore_empty_lines` Logical: should empty lines be ignored (default) or +#' generate a row of missing values (if `FALSE`)? +#' +#' `JsonParseOptions$create()` accepts only the `newlines_in_values` argument. +#' +#' `CsvConvertOptions$create()` takes the following arguments: +#' +#' - `check_utf8` Logical: check UTF8 validity of string columns? (default `TRUE`) +#' - `null_values` character vector of recognized spellings for null values. +#' Analogous to the `na.strings` argument to +#' [`read.csv()`][utils::read.csv()] or `na` in `readr::read_csv()`. +#' - `strings_can_be_null` Logical: can string / binary columns have +#' null values? Similar to the `quoted_na` argument to `readr::read_csv()`. +#' (default `FALSE`) +#' - `true_values` character vector of recognized spellings for `TRUE` values +#' - `false_values` character vector of recognized spellings for `FALSE` values +#' - `col_types` A `Schema` or `NULL` to infer types +#' - `auto_dict_encode` Logical: Whether to try to automatically +#' dictionary-encode string / binary data (think `stringsAsFactors`). Default `FALSE`. +#' This setting is ignored for non-inferred columns (those in `col_types`). +#' - `auto_dict_max_cardinality` If `auto_dict_encode`, string/binary columns +#' are dictionary-encoded up to this number of unique values (default 50), +#' after which it switches to regular encoding. +#' - `include_columns` If non-empty, indicates the names of columns from the +#' CSV file that should be actually read and converted (in the vector's order). +#' - `include_missing_columns` Logical: if `include_columns` is provided, should +#' columns named in it but not found in the data be included as a column of +#' type `null()`? The default (`FALSE`) means that the reader will instead +#' raise an error. +#' - `timestamp_parsers` User-defined timestamp parsers. If more than one +#' parser is specified, the CSV conversion logic will try parsing values +#' starting from the beginning of this vector. Possible values are +#' (a) `NULL`, the default, which uses the ISO-8601 parser; +#' (b) a character vector of [strptime][base::strptime()] parse strings; or +#' (c) a list of [TimestampParser] objects. +#' +#' `TimestampParser$create()` takes an optional `format` string argument. +#' See [`strptime()`][base::strptime()] for example syntax. +#' The default is to use an ISO-8601 format parser. +#' +#' The `CsvWriteOptions$create()` factory method takes the following arguments: +#' - `include_header` Whether to write an initial header line with column names +#' - `batch_size` Maximum number of rows processed at a time. Default is 1024. +#' +#' @section Active bindings: +#' +#' - `column_names`: from `CsvReadOptions` +#' +#' @export +CsvReadOptions <- R6Class("CsvReadOptions", + inherit = ArrowObject, + active = list( + column_names = function() csv___ReadOptions__column_names(self) + ) +) +CsvReadOptions$create <- function(use_threads = option_use_threads(), + block_size = 1048576L, + skip_rows = 0L, + column_names = character(0), + autogenerate_column_names = FALSE) { + csv___ReadOptions__initialize( + list( + use_threads = use_threads, + block_size = block_size, + skip_rows = skip_rows, + column_names = column_names, + autogenerate_column_names = autogenerate_column_names + ) + ) +} + +#' @rdname CsvReadOptions +#' @export +CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject) +CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L) { + assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0) + csv___WriteOptions__initialize( + list( + include_header = include_header, + batch_size = as.integer(batch_size) + ) + ) +} + +readr_to_csv_read_options <- function(skip, col_names, col_types) { + if (isTRUE(col_names)) { + # C++ default to parse is 0-length string array + col_names <- character(0) + } + if (identical(col_names, FALSE)) { + CsvReadOptions$create(skip_rows = skip, autogenerate_column_names = TRUE) + } else { + CsvReadOptions$create(skip_rows = skip, column_names = col_names) + } +} + +#' @rdname CsvReadOptions +#' @usage NULL +#' @format NULL +#' @docType class +#' @export +CsvParseOptions <- R6Class("CsvParseOptions", inherit = ArrowObject) +CsvParseOptions$create <- function(delimiter = ",", + quoting = TRUE, + quote_char = '"', + double_quote = TRUE, + escaping = FALSE, + escape_char = "\\", + newlines_in_values = FALSE, + ignore_empty_lines = TRUE) { + csv___ParseOptions__initialize( + list( + delimiter = delimiter, + quoting = quoting, + quote_char = quote_char, + double_quote = double_quote, + escaping = escaping, + escape_char = escape_char, + newlines_in_values = newlines_in_values, + ignore_empty_lines = ignore_empty_lines + ) + ) +} + +readr_to_csv_parse_options <- function(delim = ",", + quote = '"', + escape_double = TRUE, + escape_backslash = FALSE, + skip_empty_rows = TRUE) { + # This function translates from the readr argument list to the arrow arg names + # TODO: validate inputs + CsvParseOptions$create( + delimiter = delim, + quoting = nzchar(quote), + quote_char = quote, + double_quote = escape_double, + escaping = escape_backslash, + escape_char = "\\", + newlines_in_values = escape_backslash, + ignore_empty_lines = skip_empty_rows + ) +} + +#' @rdname CsvReadOptions +#' @usage NULL +#' @format NULL +#' @docType class +#' @export +TimestampParser <- R6Class("TimestampParser", + inherit = ArrowObject, + public = list( + kind = function() TimestampParser__kind(self), + format = function() TimestampParser__format(self) + ) +) +TimestampParser$create <- function(format = NULL) { + if (is.null(format)) { + TimestampParser__MakeISO8601() + } else { + TimestampParser__MakeStrptime(format) + } +} + +#' @rdname CsvReadOptions +#' @usage NULL +#' @format NULL +#' @docType class +#' @export +CsvConvertOptions <- R6Class("CsvConvertOptions", inherit = ArrowObject) +CsvConvertOptions$create <- function(check_utf8 = TRUE, + null_values = c("", "NA"), + true_values = c("T", "true", "TRUE"), + false_values = c("F", "false", "FALSE"), + strings_can_be_null = FALSE, + col_types = NULL, + auto_dict_encode = FALSE, + auto_dict_max_cardinality = 50L, + include_columns = character(), + include_missing_columns = FALSE, + timestamp_parsers = NULL) { + if (!is.null(col_types) && !inherits(col_types, "Schema")) { + abort(c( + "Unsupported `col_types` specification.", + i = "`col_types` must be NULL, or a <Schema>." + )) + } + + csv___ConvertOptions__initialize( + list( + check_utf8 = check_utf8, + null_values = null_values, + strings_can_be_null = strings_can_be_null, + col_types = col_types, + true_values = true_values, + false_values = false_values, + auto_dict_encode = auto_dict_encode, + auto_dict_max_cardinality = auto_dict_max_cardinality, + include_columns = include_columns, + include_missing_columns = include_missing_columns, + timestamp_parsers = timestamp_parsers + ) + ) +} + +readr_to_csv_convert_options <- function(na, + quoted_na, + col_types = NULL, + col_names = NULL, + timestamp_parsers = NULL) { + include_columns <- character() + + if (is.character(col_types)) { + if (length(col_types) != 1L) { + abort("`col_types` is a character vector that is not of size 1") + } + n <- nchar(col_types) + specs <- substring(col_types, seq_len(n), seq_len(n)) + if (!is_bare_character(col_names, n)) { + abort("Compact specification for `col_types` requires `col_names`") + } + + col_types <- set_names(nm = col_names, map2(specs, col_names, ~ { + switch(.x, + "c" = utf8(), + "i" = int32(), + "n" = float64(), + "d" = float64(), + "l" = bool(), + "f" = dictionary(), + "D" = date32(), + "T" = timestamp(), + "t" = time32(), + "_" = null(), + "-" = null(), + "?" = NULL, + abort("Unsupported compact specification: '", .x, "' for column '", .y, "'") + ) + })) + # To "guess" types, omit them from col_types + col_types <- keep(col_types, ~ !is.null(.x)) + col_types <- schema(!!!col_types) + } + + if (!is.null(col_types)) { + assert_is(col_types, "Schema") + # If any columns are null(), drop them + # (by specifying the other columns in include_columns) + nulls <- map_lgl(col_types$fields, ~ .$type$Equals(null())) + if (any(nulls)) { + include_columns <- setdiff(col_names, names(col_types)[nulls]) + } + } + CsvConvertOptions$create( + null_values = na, + strings_can_be_null = quoted_na, + col_types = col_types, + timestamp_parsers = timestamp_parsers, + include_columns = include_columns + ) +} + +#' Write CSV file to disk +#' +#' @param x `data.frame`, [RecordBatch], or [Table] +#' @param sink A string file path, URI, or [OutputStream], or path in a file +#' system (`SubTreeFileSystem`) +#' @param include_header Whether to write an initial header line with column names +#' @param batch_size Maximum number of rows processed at a time. Default is 1024. +#' +#' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream], +#' the stream will be left open. +#' @export +#' @examplesIf arrow_available() +#' tf <- tempfile() +#' on.exit(unlink(tf)) +#' write_csv_arrow(mtcars, tf) +#' @include arrow-package.R +write_csv_arrow <- function(x, + sink, + include_header = TRUE, + batch_size = 1024L) { + write_options <- CsvWriteOptions$create(include_header, batch_size) + + x_out <- x + if (is.data.frame(x)) { + x <- Table$create(x) + } + + assert_that(is_writable_table(x)) + + if (!inherits(sink, "OutputStream")) { + sink <- make_output_stream(sink) + on.exit(sink$close()) + } + + if (inherits(x, "RecordBatch")) { + csv___WriteCSV__RecordBatch(x, write_options, sink) + } else if (inherits(x, "Table")) { + csv___WriteCSV__Table(x, write_options, sink) + } + + invisible(x_out) +} |