summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/R/csv.R
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/r/R/csv.R
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/r/R/csv.R')
-rw-r--r--src/arrow/r/R/csv.R644
1 files changed, 644 insertions, 0 deletions
diff --git a/src/arrow/r/R/csv.R b/src/arrow/r/R/csv.R
new file mode 100644
index 000000000..ee890578f
--- /dev/null
+++ b/src/arrow/r/R/csv.R
@@ -0,0 +1,644 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' Read a CSV or other delimited file with Arrow
+#'
+#' These functions uses the Arrow C++ CSV reader to read into a `data.frame`.
+#' Arrow C++ options have been mapped to argument names that follow those of
+#' `readr::read_delim()`, and `col_select` was inspired by `vroom::vroom()`.
+#'
+#' `read_csv_arrow()` and `read_tsv_arrow()` are wrappers around
+#' `read_delim_arrow()` that specify a delimiter.
+#'
+#' Note that not all `readr` options are currently implemented here. Please file
+#' an issue if you encounter one that `arrow` should support.
+#'
+#' If you need to control Arrow-specific reader parameters that don't have an
+#' equivalent in `readr::read_csv()`, you can either provide them in the
+#' `parse_options`, `convert_options`, or `read_options` arguments, or you can
+#' use [CsvTableReader] directly for lower-level access.
+#'
+#' @section Specifying column types and names:
+#'
+#' By default, the CSV reader will infer the column names and data types from the file, but there
+#' are a few ways you can specify them directly.
+#'
+#' One way is to provide an Arrow [Schema] in the `schema` argument,
+#' which is an ordered map of column name to type.
+#' When provided, it satisfies both the `col_names` and `col_types` arguments.
+#' This is good if you know all of this information up front.
+#'
+#' You can also pass a `Schema` to the `col_types` argument. If you do this,
+#' column names will still be inferred from the file unless you also specify
+#' `col_names`. In either case, the column names in the `Schema` must match the
+#' data's column names, whether they are explicitly provided or inferred. That
+#' said, this `Schema` does not have to reference all columns: those omitted
+#' will have their types inferred.
+#'
+#' Alternatively, you can declare column types by providing the compact string representation
+#' that `readr` uses to the `col_types` argument. This means you provide a
+#' single string, one character per column, where the characters map to Arrow
+#' types analogously to the `readr` type mapping:
+#'
+#' * "c": `utf8()`
+#' * "i": `int32()`
+#' * "n": `float64()`
+#' * "d": `float64()`
+#' * "l": `bool()`
+#' * "f": `dictionary()`
+#' * "D": `date32()`
+#' * "T": `timestamp()`
+#' * "t": `time32()`
+#' * "_": `null()`
+#' * "-": `null()`
+#' * "?": infer the type from the data
+#'
+#' If you use the compact string representation for `col_types`, you must also
+#' specify `col_names`.
+#'
+#' Regardless of how types are specified, all columns with a `null()` type will
+#' be dropped.
+#'
+#' Note that if you are specifying column names, whether by `schema` or
+#' `col_names`, and the CSV file has a header row that would otherwise be used
+#' to idenfity column names, you'll need to add `skip = 1` to skip that row.
+#'
+#' @param file A character file name or URI, `raw` vector, an Arrow input stream,
+#' or a `FileSystem` with path (`SubTreeFileSystem`).
+#' If a file name, a memory-mapped Arrow [InputStream] will be opened and
+#' closed when finished; compression will be detected from the file extension
+#' and handled automatically. If an input stream is provided, it will be left
+#' open.
+#' @param delim Single character used to separate fields within a record.
+#' @param quote Single character used to quote strings.
+#' @param escape_double Does the file escape quotes by doubling them?
+#' i.e. If this option is `TRUE`, the value `""""` represents
+#' a single quote, `\"`.
+#' @param escape_backslash Does the file use backslashes to escape special
+#' characters? This is more general than `escape_double` as backslashes
+#' can be used to escape the delimiter character, the quote character, or
+#' to add special characters like `\\n`.
+#' @param schema [Schema] that describes the table. If provided, it will be
+#' used to satisfy both `col_names` and `col_types`.
+#' @param col_names If `TRUE`, the first row of the input will be used as the
+#' column names and will not be included in the data frame. If `FALSE`, column
+#' names will be generated by Arrow, starting with "f0", "f1", ..., "fN".
+#' Alternatively, you can specify a character vector of column names.
+#' @param col_types A compact string representation of the column types, or
+#' `NULL` (the default) to infer types from the data.
+#' @param col_select A character vector of column names to keep, as in the
+#' "select" argument to `data.table::fread()`, or a
+#' [tidy selection specification][tidyselect::vars_select()]
+#' of columns, as used in `dplyr::select()`.
+#' @param na A character vector of strings to interpret as missing values.
+#' @param quoted_na Should missing values inside quotes be treated as missing
+#' values (the default) or strings. (Note that this is different from the
+#' the Arrow C++ default for the corresponding convert option,
+#' `strings_can_be_null`.)
+#' @param skip_empty_rows Should blank rows be ignored altogether? If
+#' `TRUE`, blank rows will not be represented at all. If `FALSE`, they will be
+#' filled with missings.
+#' @param skip Number of lines to skip before reading data.
+#' @param timestamp_parsers User-defined timestamp parsers. If more than one
+#' parser is specified, the CSV conversion logic will try parsing values
+#' starting from the beginning of this vector. Possible values are:
+#' - `NULL`: the default, which uses the ISO-8601 parser
+#' - a character vector of [strptime][base::strptime()] parse strings
+#' - a list of [TimestampParser] objects
+#' @param parse_options see [file reader options][CsvReadOptions].
+#' If given, this overrides any
+#' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.).
+#' @param convert_options see [file reader options][CsvReadOptions]
+#' @param read_options see [file reader options][CsvReadOptions]
+#' @param as_data_frame Should the function return a `data.frame` (default) or
+#' an Arrow [Table]?
+#'
+#' @return A `data.frame`, or a Table if `as_data_frame = FALSE`.
+#' @export
+#' @examplesIf arrow_available()
+#' tf <- tempfile()
+#' on.exit(unlink(tf))
+#' write.csv(mtcars, file = tf)
+#' df <- read_csv_arrow(tf)
+#' dim(df)
+#' # Can select columns
+#' df <- read_csv_arrow(tf, col_select = starts_with("d"))
+read_delim_arrow <- function(file,
+ delim = ",",
+ quote = '"',
+ escape_double = TRUE,
+ escape_backslash = FALSE,
+ schema = NULL,
+ col_names = TRUE,
+ col_types = NULL,
+ col_select = NULL,
+ na = c("", "NA"),
+ quoted_na = TRUE,
+ skip_empty_rows = TRUE,
+ skip = 0L,
+ parse_options = NULL,
+ convert_options = NULL,
+ read_options = NULL,
+ as_data_frame = TRUE,
+ timestamp_parsers = NULL) {
+ if (inherits(schema, "Schema")) {
+ col_names <- names(schema)
+ col_types <- schema
+ }
+ if (is.null(parse_options)) {
+ parse_options <- readr_to_csv_parse_options(
+ delim,
+ quote,
+ escape_double,
+ escape_backslash,
+ skip_empty_rows
+ )
+ }
+ if (is.null(read_options)) {
+ read_options <- readr_to_csv_read_options(skip, col_names)
+ }
+ if (is.null(convert_options)) {
+ convert_options <- readr_to_csv_convert_options(
+ na,
+ quoted_na,
+ col_types = col_types,
+ col_names = read_options$column_names,
+ timestamp_parsers = timestamp_parsers
+ )
+ }
+
+ if (!inherits(file, "InputStream")) {
+ file <- make_readable_file(file)
+ on.exit(file$close())
+ }
+ reader <- CsvTableReader$create(
+ file,
+ read_options = read_options,
+ parse_options = parse_options,
+ convert_options = convert_options
+ )
+
+ tab <- reader$Read()
+
+ # TODO: move this into convert_options using include_columns
+ col_select <- enquo(col_select)
+ if (!quo_is_null(col_select)) {
+ tab <- tab[vars_select(names(tab), !!col_select)]
+ }
+
+ if (isTRUE(as_data_frame)) {
+ tab <- as.data.frame(tab)
+ }
+
+ tab
+}
+
+#' @rdname read_delim_arrow
+#' @export
+read_csv_arrow <- function(file,
+ quote = '"',
+ escape_double = TRUE,
+ escape_backslash = FALSE,
+ schema = NULL,
+ col_names = TRUE,
+ col_types = NULL,
+ col_select = NULL,
+ na = c("", "NA"),
+ quoted_na = TRUE,
+ skip_empty_rows = TRUE,
+ skip = 0L,
+ parse_options = NULL,
+ convert_options = NULL,
+ read_options = NULL,
+ as_data_frame = TRUE,
+ timestamp_parsers = NULL) {
+ mc <- match.call()
+ mc$delim <- ","
+ mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow"))
+ eval.parent(mc)
+}
+
+#' @rdname read_delim_arrow
+#' @export
+read_tsv_arrow <- function(file,
+ quote = '"',
+ escape_double = TRUE,
+ escape_backslash = FALSE,
+ schema = NULL,
+ col_names = TRUE,
+ col_types = NULL,
+ col_select = NULL,
+ na = c("", "NA"),
+ quoted_na = TRUE,
+ skip_empty_rows = TRUE,
+ skip = 0L,
+ parse_options = NULL,
+ convert_options = NULL,
+ read_options = NULL,
+ as_data_frame = TRUE,
+ timestamp_parsers = NULL) {
+ mc <- match.call()
+ mc$delim <- "\t"
+ mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow"))
+ eval.parent(mc)
+}
+
+#' @title Arrow CSV and JSON table reader classes
+#' @rdname CsvTableReader
+#' @name CsvTableReader
+#' @docType class
+#' @usage NULL
+#' @format NULL
+#' @description `CsvTableReader` and `JsonTableReader` wrap the Arrow C++ CSV
+#' and JSON table readers. See their usage in [read_csv_arrow()] and
+#' [read_json_arrow()], respectively.
+#'
+#' @section Factory:
+#'
+#' The `CsvTableReader$create()` and `JsonTableReader$create()` factory methods
+#' take the following arguments:
+#'
+#' - `file` An Arrow [InputStream]
+#' - `convert_options` (CSV only), `parse_options`, `read_options`: see
+#' [CsvReadOptions]
+#' - `...` additional parameters.
+#'
+#' @section Methods:
+#'
+#' - `$Read()`: returns an Arrow Table.
+#'
+#' @include arrow-package.R
+#' @export
+CsvTableReader <- R6Class("CsvTableReader",
+ inherit = ArrowObject,
+ public = list(
+ Read = function() csv___TableReader__Read(self)
+ )
+)
+CsvTableReader$create <- function(file,
+ read_options = CsvReadOptions$create(),
+ parse_options = CsvParseOptions$create(),
+ convert_options = CsvConvertOptions$create(),
+ ...) {
+ assert_is(file, "InputStream")
+ csv___TableReader__Make(file, read_options, parse_options, convert_options)
+}
+
+#' @title File reader options
+#' @rdname CsvReadOptions
+#' @name CsvReadOptions
+#' @docType class
+#' @usage NULL
+#' @format NULL
+#' @description `CsvReadOptions`, `CsvParseOptions`, `CsvConvertOptions`,
+#' `JsonReadOptions`, `JsonParseOptions`, and `TimestampParser` are containers for various
+#' file reading options. See their usage in [read_csv_arrow()] and
+#' [read_json_arrow()], respectively.
+#'
+#' @section Factory:
+#'
+#' The `CsvReadOptions$create()` and `JsonReadOptions$create()` factory methods
+#' take the following arguments:
+#'
+#' - `use_threads` Whether to use the global CPU thread pool
+#' - `block_size` Block size we request from the IO layer; also determines
+#' the size of chunks when use_threads is `TRUE`. NB: if `FALSE`, JSON input
+#' must end with an empty line.
+#'
+#' `CsvReadOptions$create()` further accepts these additional arguments:
+#'
+#' - `skip_rows` Number of lines to skip before reading data (default 0)
+#' - `column_names` Character vector to supply column names. If length-0
+#' (the default), the first non-skipped row will be parsed to generate column
+#' names, unless `autogenerate_column_names` is `TRUE`.
+#' - `autogenerate_column_names` Logical: generate column names instead of
+#' using the first non-skipped row (the default)? If `TRUE`, column names will
+#' be "f0", "f1", ..., "fN".
+#'
+#' `CsvParseOptions$create()` takes the following arguments:
+#'
+#' - `delimiter` Field delimiting character (default `","`)
+#' - `quoting` Logical: are strings quoted? (default `TRUE`)
+#' - `quote_char` Quoting character, if `quoting` is `TRUE`
+#' - `double_quote` Logical: are quotes inside values double-quoted? (default `TRUE`)
+#' - `escaping` Logical: whether escaping is used (default `FALSE`)
+#' - `escape_char` Escaping character, if `escaping` is `TRUE`
+#' - `newlines_in_values` Logical: are values allowed to contain CR (`0x0d`)
+#' and LF (`0x0a`) characters? (default `FALSE`)
+#' - `ignore_empty_lines` Logical: should empty lines be ignored (default) or
+#' generate a row of missing values (if `FALSE`)?
+#'
+#' `JsonParseOptions$create()` accepts only the `newlines_in_values` argument.
+#'
+#' `CsvConvertOptions$create()` takes the following arguments:
+#'
+#' - `check_utf8` Logical: check UTF8 validity of string columns? (default `TRUE`)
+#' - `null_values` character vector of recognized spellings for null values.
+#' Analogous to the `na.strings` argument to
+#' [`read.csv()`][utils::read.csv()] or `na` in `readr::read_csv()`.
+#' - `strings_can_be_null` Logical: can string / binary columns have
+#' null values? Similar to the `quoted_na` argument to `readr::read_csv()`.
+#' (default `FALSE`)
+#' - `true_values` character vector of recognized spellings for `TRUE` values
+#' - `false_values` character vector of recognized spellings for `FALSE` values
+#' - `col_types` A `Schema` or `NULL` to infer types
+#' - `auto_dict_encode` Logical: Whether to try to automatically
+#' dictionary-encode string / binary data (think `stringsAsFactors`). Default `FALSE`.
+#' This setting is ignored for non-inferred columns (those in `col_types`).
+#' - `auto_dict_max_cardinality` If `auto_dict_encode`, string/binary columns
+#' are dictionary-encoded up to this number of unique values (default 50),
+#' after which it switches to regular encoding.
+#' - `include_columns` If non-empty, indicates the names of columns from the
+#' CSV file that should be actually read and converted (in the vector's order).
+#' - `include_missing_columns` Logical: if `include_columns` is provided, should
+#' columns named in it but not found in the data be included as a column of
+#' type `null()`? The default (`FALSE`) means that the reader will instead
+#' raise an error.
+#' - `timestamp_parsers` User-defined timestamp parsers. If more than one
+#' parser is specified, the CSV conversion logic will try parsing values
+#' starting from the beginning of this vector. Possible values are
+#' (a) `NULL`, the default, which uses the ISO-8601 parser;
+#' (b) a character vector of [strptime][base::strptime()] parse strings; or
+#' (c) a list of [TimestampParser] objects.
+#'
+#' `TimestampParser$create()` takes an optional `format` string argument.
+#' See [`strptime()`][base::strptime()] for example syntax.
+#' The default is to use an ISO-8601 format parser.
+#'
+#' The `CsvWriteOptions$create()` factory method takes the following arguments:
+#' - `include_header` Whether to write an initial header line with column names
+#' - `batch_size` Maximum number of rows processed at a time. Default is 1024.
+#'
+#' @section Active bindings:
+#'
+#' - `column_names`: from `CsvReadOptions`
+#'
+#' @export
+CsvReadOptions <- R6Class("CsvReadOptions",
+ inherit = ArrowObject,
+ active = list(
+ column_names = function() csv___ReadOptions__column_names(self)
+ )
+)
+CsvReadOptions$create <- function(use_threads = option_use_threads(),
+ block_size = 1048576L,
+ skip_rows = 0L,
+ column_names = character(0),
+ autogenerate_column_names = FALSE) {
+ csv___ReadOptions__initialize(
+ list(
+ use_threads = use_threads,
+ block_size = block_size,
+ skip_rows = skip_rows,
+ column_names = column_names,
+ autogenerate_column_names = autogenerate_column_names
+ )
+ )
+}
+
+#' @rdname CsvReadOptions
+#' @export
+CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject)
+CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L) {
+ assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0)
+ csv___WriteOptions__initialize(
+ list(
+ include_header = include_header,
+ batch_size = as.integer(batch_size)
+ )
+ )
+}
+
+readr_to_csv_read_options <- function(skip, col_names, col_types) {
+ if (isTRUE(col_names)) {
+ # C++ default to parse is 0-length string array
+ col_names <- character(0)
+ }
+ if (identical(col_names, FALSE)) {
+ CsvReadOptions$create(skip_rows = skip, autogenerate_column_names = TRUE)
+ } else {
+ CsvReadOptions$create(skip_rows = skip, column_names = col_names)
+ }
+}
+
+#' @rdname CsvReadOptions
+#' @usage NULL
+#' @format NULL
+#' @docType class
+#' @export
+CsvParseOptions <- R6Class("CsvParseOptions", inherit = ArrowObject)
+CsvParseOptions$create <- function(delimiter = ",",
+ quoting = TRUE,
+ quote_char = '"',
+ double_quote = TRUE,
+ escaping = FALSE,
+ escape_char = "\\",
+ newlines_in_values = FALSE,
+ ignore_empty_lines = TRUE) {
+ csv___ParseOptions__initialize(
+ list(
+ delimiter = delimiter,
+ quoting = quoting,
+ quote_char = quote_char,
+ double_quote = double_quote,
+ escaping = escaping,
+ escape_char = escape_char,
+ newlines_in_values = newlines_in_values,
+ ignore_empty_lines = ignore_empty_lines
+ )
+ )
+}
+
+readr_to_csv_parse_options <- function(delim = ",",
+ quote = '"',
+ escape_double = TRUE,
+ escape_backslash = FALSE,
+ skip_empty_rows = TRUE) {
+ # This function translates from the readr argument list to the arrow arg names
+ # TODO: validate inputs
+ CsvParseOptions$create(
+ delimiter = delim,
+ quoting = nzchar(quote),
+ quote_char = quote,
+ double_quote = escape_double,
+ escaping = escape_backslash,
+ escape_char = "\\",
+ newlines_in_values = escape_backslash,
+ ignore_empty_lines = skip_empty_rows
+ )
+}
+
+#' @rdname CsvReadOptions
+#' @usage NULL
+#' @format NULL
+#' @docType class
+#' @export
+TimestampParser <- R6Class("TimestampParser",
+ inherit = ArrowObject,
+ public = list(
+ kind = function() TimestampParser__kind(self),
+ format = function() TimestampParser__format(self)
+ )
+)
+TimestampParser$create <- function(format = NULL) {
+ if (is.null(format)) {
+ TimestampParser__MakeISO8601()
+ } else {
+ TimestampParser__MakeStrptime(format)
+ }
+}
+
+#' @rdname CsvReadOptions
+#' @usage NULL
+#' @format NULL
+#' @docType class
+#' @export
+CsvConvertOptions <- R6Class("CsvConvertOptions", inherit = ArrowObject)
+CsvConvertOptions$create <- function(check_utf8 = TRUE,
+ null_values = c("", "NA"),
+ true_values = c("T", "true", "TRUE"),
+ false_values = c("F", "false", "FALSE"),
+ strings_can_be_null = FALSE,
+ col_types = NULL,
+ auto_dict_encode = FALSE,
+ auto_dict_max_cardinality = 50L,
+ include_columns = character(),
+ include_missing_columns = FALSE,
+ timestamp_parsers = NULL) {
+ if (!is.null(col_types) && !inherits(col_types, "Schema")) {
+ abort(c(
+ "Unsupported `col_types` specification.",
+ i = "`col_types` must be NULL, or a <Schema>."
+ ))
+ }
+
+ csv___ConvertOptions__initialize(
+ list(
+ check_utf8 = check_utf8,
+ null_values = null_values,
+ strings_can_be_null = strings_can_be_null,
+ col_types = col_types,
+ true_values = true_values,
+ false_values = false_values,
+ auto_dict_encode = auto_dict_encode,
+ auto_dict_max_cardinality = auto_dict_max_cardinality,
+ include_columns = include_columns,
+ include_missing_columns = include_missing_columns,
+ timestamp_parsers = timestamp_parsers
+ )
+ )
+}
+
+readr_to_csv_convert_options <- function(na,
+ quoted_na,
+ col_types = NULL,
+ col_names = NULL,
+ timestamp_parsers = NULL) {
+ include_columns <- character()
+
+ if (is.character(col_types)) {
+ if (length(col_types) != 1L) {
+ abort("`col_types` is a character vector that is not of size 1")
+ }
+ n <- nchar(col_types)
+ specs <- substring(col_types, seq_len(n), seq_len(n))
+ if (!is_bare_character(col_names, n)) {
+ abort("Compact specification for `col_types` requires `col_names`")
+ }
+
+ col_types <- set_names(nm = col_names, map2(specs, col_names, ~ {
+ switch(.x,
+ "c" = utf8(),
+ "i" = int32(),
+ "n" = float64(),
+ "d" = float64(),
+ "l" = bool(),
+ "f" = dictionary(),
+ "D" = date32(),
+ "T" = timestamp(),
+ "t" = time32(),
+ "_" = null(),
+ "-" = null(),
+ "?" = NULL,
+ abort("Unsupported compact specification: '", .x, "' for column '", .y, "'")
+ )
+ }))
+ # To "guess" types, omit them from col_types
+ col_types <- keep(col_types, ~ !is.null(.x))
+ col_types <- schema(!!!col_types)
+ }
+
+ if (!is.null(col_types)) {
+ assert_is(col_types, "Schema")
+ # If any columns are null(), drop them
+ # (by specifying the other columns in include_columns)
+ nulls <- map_lgl(col_types$fields, ~ .$type$Equals(null()))
+ if (any(nulls)) {
+ include_columns <- setdiff(col_names, names(col_types)[nulls])
+ }
+ }
+ CsvConvertOptions$create(
+ null_values = na,
+ strings_can_be_null = quoted_na,
+ col_types = col_types,
+ timestamp_parsers = timestamp_parsers,
+ include_columns = include_columns
+ )
+}
+
+#' Write CSV file to disk
+#'
+#' @param x `data.frame`, [RecordBatch], or [Table]
+#' @param sink A string file path, URI, or [OutputStream], or path in a file
+#' system (`SubTreeFileSystem`)
+#' @param include_header Whether to write an initial header line with column names
+#' @param batch_size Maximum number of rows processed at a time. Default is 1024.
+#'
+#' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream],
+#' the stream will be left open.
+#' @export
+#' @examplesIf arrow_available()
+#' tf <- tempfile()
+#' on.exit(unlink(tf))
+#' write_csv_arrow(mtcars, tf)
+#' @include arrow-package.R
+write_csv_arrow <- function(x,
+ sink,
+ include_header = TRUE,
+ batch_size = 1024L) {
+ write_options <- CsvWriteOptions$create(include_header, batch_size)
+
+ x_out <- x
+ if (is.data.frame(x)) {
+ x <- Table$create(x)
+ }
+
+ assert_that(is_writable_table(x))
+
+ if (!inherits(sink, "OutputStream")) {
+ sink <- make_output_stream(sink)
+ on.exit(sink$close())
+ }
+
+ if (inherits(x, "RecordBatch")) {
+ csv___WriteCSV__RecordBatch(x, write_options, sink)
+ } else if (inherits(x, "Table")) {
+ csv___WriteCSV__Table(x, write_options, sink)
+ }
+
+ invisible(x_out)
+}