# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. #' Read a CSV or other delimited file with Arrow #' #' These functions uses the Arrow C++ CSV reader to read into a `data.frame`. #' Arrow C++ options have been mapped to argument names that follow those of #' `readr::read_delim()`, and `col_select` was inspired by `vroom::vroom()`. #' #' `read_csv_arrow()` and `read_tsv_arrow()` are wrappers around #' `read_delim_arrow()` that specify a delimiter. #' #' Note that not all `readr` options are currently implemented here. Please file #' an issue if you encounter one that `arrow` should support. #' #' If you need to control Arrow-specific reader parameters that don't have an #' equivalent in `readr::read_csv()`, you can either provide them in the #' `parse_options`, `convert_options`, or `read_options` arguments, or you can #' use [CsvTableReader] directly for lower-level access. #' #' @section Specifying column types and names: #' #' By default, the CSV reader will infer the column names and data types from the file, but there #' are a few ways you can specify them directly. #' #' One way is to provide an Arrow [Schema] in the `schema` argument, #' which is an ordered map of column name to type. #' When provided, it satisfies both the `col_names` and `col_types` arguments. #' This is good if you know all of this information up front. #' #' You can also pass a `Schema` to the `col_types` argument. If you do this, #' column names will still be inferred from the file unless you also specify #' `col_names`. In either case, the column names in the `Schema` must match the #' data's column names, whether they are explicitly provided or inferred. That #' said, this `Schema` does not have to reference all columns: those omitted #' will have their types inferred. #' #' Alternatively, you can declare column types by providing the compact string representation #' that `readr` uses to the `col_types` argument. This means you provide a #' single string, one character per column, where the characters map to Arrow #' types analogously to the `readr` type mapping: #' #' * "c": `utf8()` #' * "i": `int32()` #' * "n": `float64()` #' * "d": `float64()` #' * "l": `bool()` #' * "f": `dictionary()` #' * "D": `date32()` #' * "T": `timestamp()` #' * "t": `time32()` #' * "_": `null()` #' * "-": `null()` #' * "?": infer the type from the data #' #' If you use the compact string representation for `col_types`, you must also #' specify `col_names`. #' #' Regardless of how types are specified, all columns with a `null()` type will #' be dropped. #' #' Note that if you are specifying column names, whether by `schema` or #' `col_names`, and the CSV file has a header row that would otherwise be used #' to idenfity column names, you'll need to add `skip = 1` to skip that row. #' #' @param file A character file name or URI, `raw` vector, an Arrow input stream, #' or a `FileSystem` with path (`SubTreeFileSystem`). #' If a file name, a memory-mapped Arrow [InputStream] will be opened and #' closed when finished; compression will be detected from the file extension #' and handled automatically. If an input stream is provided, it will be left #' open. #' @param delim Single character used to separate fields within a record. #' @param quote Single character used to quote strings. #' @param escape_double Does the file escape quotes by doubling them? #' i.e. If this option is `TRUE`, the value `""""` represents #' a single quote, `\"`. #' @param escape_backslash Does the file use backslashes to escape special #' characters? This is more general than `escape_double` as backslashes #' can be used to escape the delimiter character, the quote character, or #' to add special characters like `\\n`. #' @param schema [Schema] that describes the table. If provided, it will be #' used to satisfy both `col_names` and `col_types`. #' @param col_names If `TRUE`, the first row of the input will be used as the #' column names and will not be included in the data frame. If `FALSE`, column #' names will be generated by Arrow, starting with "f0", "f1", ..., "fN". #' Alternatively, you can specify a character vector of column names. #' @param col_types A compact string representation of the column types, or #' `NULL` (the default) to infer types from the data. #' @param col_select A character vector of column names to keep, as in the #' "select" argument to `data.table::fread()`, or a #' [tidy selection specification][tidyselect::vars_select()] #' of columns, as used in `dplyr::select()`. #' @param na A character vector of strings to interpret as missing values. #' @param quoted_na Should missing values inside quotes be treated as missing #' values (the default) or strings. (Note that this is different from the #' the Arrow C++ default for the corresponding convert option, #' `strings_can_be_null`.) #' @param skip_empty_rows Should blank rows be ignored altogether? If #' `TRUE`, blank rows will not be represented at all. If `FALSE`, they will be #' filled with missings. #' @param skip Number of lines to skip before reading data. #' @param timestamp_parsers User-defined timestamp parsers. If more than one #' parser is specified, the CSV conversion logic will try parsing values #' starting from the beginning of this vector. Possible values are: #' - `NULL`: the default, which uses the ISO-8601 parser #' - a character vector of [strptime][base::strptime()] parse strings #' - a list of [TimestampParser] objects #' @param parse_options see [file reader options][CsvReadOptions]. #' If given, this overrides any #' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.). #' @param convert_options see [file reader options][CsvReadOptions] #' @param read_options see [file reader options][CsvReadOptions] #' @param as_data_frame Should the function return a `data.frame` (default) or #' an Arrow [Table]? #' #' @return A `data.frame`, or a Table if `as_data_frame = FALSE`. #' @export #' @examplesIf arrow_available() #' tf <- tempfile() #' on.exit(unlink(tf)) #' write.csv(mtcars, file = tf) #' df <- read_csv_arrow(tf) #' dim(df) #' # Can select columns #' df <- read_csv_arrow(tf, col_select = starts_with("d")) read_delim_arrow <- function(file, delim = ",", quote = '"', escape_double = TRUE, escape_backslash = FALSE, schema = NULL, col_names = TRUE, col_types = NULL, col_select = NULL, na = c("", "NA"), quoted_na = TRUE, skip_empty_rows = TRUE, skip = 0L, parse_options = NULL, convert_options = NULL, read_options = NULL, as_data_frame = TRUE, timestamp_parsers = NULL) { if (inherits(schema, "Schema")) { col_names <- names(schema) col_types <- schema } if (is.null(parse_options)) { parse_options <- readr_to_csv_parse_options( delim, quote, escape_double, escape_backslash, skip_empty_rows ) } if (is.null(read_options)) { read_options <- readr_to_csv_read_options(skip, col_names) } if (is.null(convert_options)) { convert_options <- readr_to_csv_convert_options( na, quoted_na, col_types = col_types, col_names = read_options$column_names, timestamp_parsers = timestamp_parsers ) } if (!inherits(file, "InputStream")) { file <- make_readable_file(file) on.exit(file$close()) } reader <- CsvTableReader$create( file, read_options = read_options, parse_options = parse_options, convert_options = convert_options ) tab <- reader$Read() # TODO: move this into convert_options using include_columns col_select <- enquo(col_select) if (!quo_is_null(col_select)) { tab <- tab[vars_select(names(tab), !!col_select)] } if (isTRUE(as_data_frame)) { tab <- as.data.frame(tab) } tab } #' @rdname read_delim_arrow #' @export read_csv_arrow <- function(file, quote = '"', escape_double = TRUE, escape_backslash = FALSE, schema = NULL, col_names = TRUE, col_types = NULL, col_select = NULL, na = c("", "NA"), quoted_na = TRUE, skip_empty_rows = TRUE, skip = 0L, parse_options = NULL, convert_options = NULL, read_options = NULL, as_data_frame = TRUE, timestamp_parsers = NULL) { mc <- match.call() mc$delim <- "," mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow")) eval.parent(mc) } #' @rdname read_delim_arrow #' @export read_tsv_arrow <- function(file, quote = '"', escape_double = TRUE, escape_backslash = FALSE, schema = NULL, col_names = TRUE, col_types = NULL, col_select = NULL, na = c("", "NA"), quoted_na = TRUE, skip_empty_rows = TRUE, skip = 0L, parse_options = NULL, convert_options = NULL, read_options = NULL, as_data_frame = TRUE, timestamp_parsers = NULL) { mc <- match.call() mc$delim <- "\t" mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow")) eval.parent(mc) } #' @title Arrow CSV and JSON table reader classes #' @rdname CsvTableReader #' @name CsvTableReader #' @docType class #' @usage NULL #' @format NULL #' @description `CsvTableReader` and `JsonTableReader` wrap the Arrow C++ CSV #' and JSON table readers. See their usage in [read_csv_arrow()] and #' [read_json_arrow()], respectively. #' #' @section Factory: #' #' The `CsvTableReader$create()` and `JsonTableReader$create()` factory methods #' take the following arguments: #' #' - `file` An Arrow [InputStream] #' - `convert_options` (CSV only), `parse_options`, `read_options`: see #' [CsvReadOptions] #' - `...` additional parameters. #' #' @section Methods: #' #' - `$Read()`: returns an Arrow Table. #' #' @include arrow-package.R #' @export CsvTableReader <- R6Class("CsvTableReader", inherit = ArrowObject, public = list( Read = function() csv___TableReader__Read(self) ) ) CsvTableReader$create <- function(file, read_options = CsvReadOptions$create(), parse_options = CsvParseOptions$create(), convert_options = CsvConvertOptions$create(), ...) { assert_is(file, "InputStream") csv___TableReader__Make(file, read_options, parse_options, convert_options) } #' @title File reader options #' @rdname CsvReadOptions #' @name CsvReadOptions #' @docType class #' @usage NULL #' @format NULL #' @description `CsvReadOptions`, `CsvParseOptions`, `CsvConvertOptions`, #' `JsonReadOptions`, `JsonParseOptions`, and `TimestampParser` are containers for various #' file reading options. See their usage in [read_csv_arrow()] and #' [read_json_arrow()], respectively. #' #' @section Factory: #' #' The `CsvReadOptions$create()` and `JsonReadOptions$create()` factory methods #' take the following arguments: #' #' - `use_threads` Whether to use the global CPU thread pool #' - `block_size` Block size we request from the IO layer; also determines #' the size of chunks when use_threads is `TRUE`. NB: if `FALSE`, JSON input #' must end with an empty line. #' #' `CsvReadOptions$create()` further accepts these additional arguments: #' #' - `skip_rows` Number of lines to skip before reading data (default 0) #' - `column_names` Character vector to supply column names. If length-0 #' (the default), the first non-skipped row will be parsed to generate column #' names, unless `autogenerate_column_names` is `TRUE`. #' - `autogenerate_column_names` Logical: generate column names instead of #' using the first non-skipped row (the default)? If `TRUE`, column names will #' be "f0", "f1", ..., "fN". #' #' `CsvParseOptions$create()` takes the following arguments: #' #' - `delimiter` Field delimiting character (default `","`) #' - `quoting` Logical: are strings quoted? (default `TRUE`) #' - `quote_char` Quoting character, if `quoting` is `TRUE` #' - `double_quote` Logical: are quotes inside values double-quoted? (default `TRUE`) #' - `escaping` Logical: whether escaping is used (default `FALSE`) #' - `escape_char` Escaping character, if `escaping` is `TRUE` #' - `newlines_in_values` Logical: are values allowed to contain CR (`0x0d`) #' and LF (`0x0a`) characters? (default `FALSE`) #' - `ignore_empty_lines` Logical: should empty lines be ignored (default) or #' generate a row of missing values (if `FALSE`)? #' #' `JsonParseOptions$create()` accepts only the `newlines_in_values` argument. #' #' `CsvConvertOptions$create()` takes the following arguments: #' #' - `check_utf8` Logical: check UTF8 validity of string columns? (default `TRUE`) #' - `null_values` character vector of recognized spellings for null values. #' Analogous to the `na.strings` argument to #' [`read.csv()`][utils::read.csv()] or `na` in `readr::read_csv()`. #' - `strings_can_be_null` Logical: can string / binary columns have #' null values? Similar to the `quoted_na` argument to `readr::read_csv()`. #' (default `FALSE`) #' - `true_values` character vector of recognized spellings for `TRUE` values #' - `false_values` character vector of recognized spellings for `FALSE` values #' - `col_types` A `Schema` or `NULL` to infer types #' - `auto_dict_encode` Logical: Whether to try to automatically #' dictionary-encode string / binary data (think `stringsAsFactors`). Default `FALSE`. #' This setting is ignored for non-inferred columns (those in `col_types`). #' - `auto_dict_max_cardinality` If `auto_dict_encode`, string/binary columns #' are dictionary-encoded up to this number of unique values (default 50), #' after which it switches to regular encoding. #' - `include_columns` If non-empty, indicates the names of columns from the #' CSV file that should be actually read and converted (in the vector's order). #' - `include_missing_columns` Logical: if `include_columns` is provided, should #' columns named in it but not found in the data be included as a column of #' type `null()`? The default (`FALSE`) means that the reader will instead #' raise an error. #' - `timestamp_parsers` User-defined timestamp parsers. If more than one #' parser is specified, the CSV conversion logic will try parsing values #' starting from the beginning of this vector. Possible values are #' (a) `NULL`, the default, which uses the ISO-8601 parser; #' (b) a character vector of [strptime][base::strptime()] parse strings; or #' (c) a list of [TimestampParser] objects. #' #' `TimestampParser$create()` takes an optional `format` string argument. #' See [`strptime()`][base::strptime()] for example syntax. #' The default is to use an ISO-8601 format parser. #' #' The `CsvWriteOptions$create()` factory method takes the following arguments: #' - `include_header` Whether to write an initial header line with column names #' - `batch_size` Maximum number of rows processed at a time. Default is 1024. #' #' @section Active bindings: #' #' - `column_names`: from `CsvReadOptions` #' #' @export CsvReadOptions <- R6Class("CsvReadOptions", inherit = ArrowObject, active = list( column_names = function() csv___ReadOptions__column_names(self) ) ) CsvReadOptions$create <- function(use_threads = option_use_threads(), block_size = 1048576L, skip_rows = 0L, column_names = character(0), autogenerate_column_names = FALSE) { csv___ReadOptions__initialize( list( use_threads = use_threads, block_size = block_size, skip_rows = skip_rows, column_names = column_names, autogenerate_column_names = autogenerate_column_names ) ) } #' @rdname CsvReadOptions #' @export CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject) CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L) { assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0) csv___WriteOptions__initialize( list( include_header = include_header, batch_size = as.integer(batch_size) ) ) } readr_to_csv_read_options <- function(skip, col_names, col_types) { if (isTRUE(col_names)) { # C++ default to parse is 0-length string array col_names <- character(0) } if (identical(col_names, FALSE)) { CsvReadOptions$create(skip_rows = skip, autogenerate_column_names = TRUE) } else { CsvReadOptions$create(skip_rows = skip, column_names = col_names) } } #' @rdname CsvReadOptions #' @usage NULL #' @format NULL #' @docType class #' @export CsvParseOptions <- R6Class("CsvParseOptions", inherit = ArrowObject) CsvParseOptions$create <- function(delimiter = ",", quoting = TRUE, quote_char = '"', double_quote = TRUE, escaping = FALSE, escape_char = "\\", newlines_in_values = FALSE, ignore_empty_lines = TRUE) { csv___ParseOptions__initialize( list( delimiter = delimiter, quoting = quoting, quote_char = quote_char, double_quote = double_quote, escaping = escaping, escape_char = escape_char, newlines_in_values = newlines_in_values, ignore_empty_lines = ignore_empty_lines ) ) } readr_to_csv_parse_options <- function(delim = ",", quote = '"', escape_double = TRUE, escape_backslash = FALSE, skip_empty_rows = TRUE) { # This function translates from the readr argument list to the arrow arg names # TODO: validate inputs CsvParseOptions$create( delimiter = delim, quoting = nzchar(quote), quote_char = quote, double_quote = escape_double, escaping = escape_backslash, escape_char = "\\", newlines_in_values = escape_backslash, ignore_empty_lines = skip_empty_rows ) } #' @rdname CsvReadOptions #' @usage NULL #' @format NULL #' @docType class #' @export TimestampParser <- R6Class("TimestampParser", inherit = ArrowObject, public = list( kind = function() TimestampParser__kind(self), format = function() TimestampParser__format(self) ) ) TimestampParser$create <- function(format = NULL) { if (is.null(format)) { TimestampParser__MakeISO8601() } else { TimestampParser__MakeStrptime(format) } } #' @rdname CsvReadOptions #' @usage NULL #' @format NULL #' @docType class #' @export CsvConvertOptions <- R6Class("CsvConvertOptions", inherit = ArrowObject) CsvConvertOptions$create <- function(check_utf8 = TRUE, null_values = c("", "NA"), true_values = c("T", "true", "TRUE"), false_values = c("F", "false", "FALSE"), strings_can_be_null = FALSE, col_types = NULL, auto_dict_encode = FALSE, auto_dict_max_cardinality = 50L, include_columns = character(), include_missing_columns = FALSE, timestamp_parsers = NULL) { if (!is.null(col_types) && !inherits(col_types, "Schema")) { abort(c( "Unsupported `col_types` specification.", i = "`col_types` must be NULL, or a ." )) } csv___ConvertOptions__initialize( list( check_utf8 = check_utf8, null_values = null_values, strings_can_be_null = strings_can_be_null, col_types = col_types, true_values = true_values, false_values = false_values, auto_dict_encode = auto_dict_encode, auto_dict_max_cardinality = auto_dict_max_cardinality, include_columns = include_columns, include_missing_columns = include_missing_columns, timestamp_parsers = timestamp_parsers ) ) } readr_to_csv_convert_options <- function(na, quoted_na, col_types = NULL, col_names = NULL, timestamp_parsers = NULL) { include_columns <- character() if (is.character(col_types)) { if (length(col_types) != 1L) { abort("`col_types` is a character vector that is not of size 1") } n <- nchar(col_types) specs <- substring(col_types, seq_len(n), seq_len(n)) if (!is_bare_character(col_names, n)) { abort("Compact specification for `col_types` requires `col_names`") } col_types <- set_names(nm = col_names, map2(specs, col_names, ~ { switch(.x, "c" = utf8(), "i" = int32(), "n" = float64(), "d" = float64(), "l" = bool(), "f" = dictionary(), "D" = date32(), "T" = timestamp(), "t" = time32(), "_" = null(), "-" = null(), "?" = NULL, abort("Unsupported compact specification: '", .x, "' for column '", .y, "'") ) })) # To "guess" types, omit them from col_types col_types <- keep(col_types, ~ !is.null(.x)) col_types <- schema(!!!col_types) } if (!is.null(col_types)) { assert_is(col_types, "Schema") # If any columns are null(), drop them # (by specifying the other columns in include_columns) nulls <- map_lgl(col_types$fields, ~ .$type$Equals(null())) if (any(nulls)) { include_columns <- setdiff(col_names, names(col_types)[nulls]) } } CsvConvertOptions$create( null_values = na, strings_can_be_null = quoted_na, col_types = col_types, timestamp_parsers = timestamp_parsers, include_columns = include_columns ) } #' Write CSV file to disk #' #' @param x `data.frame`, [RecordBatch], or [Table] #' @param sink A string file path, URI, or [OutputStream], or path in a file #' system (`SubTreeFileSystem`) #' @param include_header Whether to write an initial header line with column names #' @param batch_size Maximum number of rows processed at a time. Default is 1024. #' #' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream], #' the stream will be left open. #' @export #' @examplesIf arrow_available() #' tf <- tempfile() #' on.exit(unlink(tf)) #' write_csv_arrow(mtcars, tf) #' @include arrow-package.R write_csv_arrow <- function(x, sink, include_header = TRUE, batch_size = 1024L) { write_options <- CsvWriteOptions$create(include_header, batch_size) x_out <- x if (is.data.frame(x)) { x <- Table$create(x) } assert_that(is_writable_table(x)) if (!inherits(sink, "OutputStream")) { sink <- make_output_stream(sink) on.exit(sink$close()) } if (inherits(x, "RecordBatch")) { csv___WriteCSV__RecordBatch(x, write_options, sink) } else if (inherits(x, "Table")) { csv___WriteCSV__Table(x, write_options, sink) } invisible(x_out) }