summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/R/dataset-write.R
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/arrow/r/R/dataset-write.R144
1 files changed, 144 insertions, 0 deletions
diff --git a/src/arrow/r/R/dataset-write.R b/src/arrow/r/R/dataset-write.R
new file mode 100644
index 000000000..3a98357b0
--- /dev/null
+++ b/src/arrow/r/R/dataset-write.R
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' Write a dataset
+#'
+#' This function allows you to write a dataset. By writing to more efficient
+#' binary storage formats, and by specifying relevant partitioning, you can
+#' make it much faster to read and query.
+#'
+#' @param dataset [Dataset], [RecordBatch], [Table], `arrow_dplyr_query`, or
+#' `data.frame`. If an `arrow_dplyr_query`, the query will be evaluated and
+#' the result will be written. This means that you can `select()`, `filter()`, `mutate()`,
+#' etc. to transform the data before it is written if you need to.
+#' @param path string path, URI, or `SubTreeFileSystem` referencing a directory
+#' to write to (directory will be created if it does not exist)
+#' @param format a string identifier of the file format. Default is to use
+#' "parquet" (see [FileFormat])
+#' @param partitioning `Partitioning` or a character vector of columns to
+#' use as partition keys (to be written as path segments). Default is to
+#' use the current `group_by()` columns.
+#' @param basename_template string template for the names of files to be written.
+#' Must contain `"{i}"`, which will be replaced with an autoincremented
+#' integer to generate basenames of datafiles. For example, `"part-{i}.feather"`
+#' will yield `"part-0.feather", ...`.
+#' @param hive_style logical: write partition segments as Hive-style
+#' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is `TRUE`.
+#' @param existing_data_behavior The behavior to use when there is already data
+#' in the destination directory. Must be one of "overwrite", "error", or
+#' "delete_matching".
+#' - "overwrite" (the default) then any new files created will overwrite
+#' existing files
+#' - "error" then the operation will fail if the destination directory is not
+#' empty
+#' - "delete_matching" then the writer will delete any existing partitions
+#' if data is going to be written to those partitions and will leave alone
+#' partitions which data is not written to.
+#' @param ... additional format-specific arguments. For available Parquet
+#' options, see [write_parquet()]. The available Feather options are
+#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
+#' versions 0.14 and lower can read it. Default is `FALSE`. You can also
+#' enable this by setting the environment variable `ARROW_PRE_0_15_IPC_FORMAT=1`.
+#' - `metadata_version`: A string like "V5" or the equivalent integer indicating
+#' the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
+#' unless the environment variable `ARROW_PRE_1_0_METADATA_VERSION=1`, in
+#' which case it will be V4.
+#' - `codec`: A [Codec] which will be used to compress body buffers of written
+#' files. Default (NULL) will not compress body buffers.
+#' - `null_fallback`: character to be used in place of missing values (`NA` or
+#' `NULL`) when using Hive-style partitioning. See [hive_partition()].
+#' @return The input `dataset`, invisibly
+#' @examplesIf arrow_with_dataset() & arrow_with_parquet() & requireNamespace("dplyr", quietly = TRUE)
+#' # You can write datasets partitioned by the values in a column (here: "cyl").
+#' # This creates a structure of the form cyl=X/part-Z.parquet.
+#' one_level_tree <- tempfile()
+#' write_dataset(mtcars, one_level_tree, partitioning = "cyl")
+#' list.files(one_level_tree, recursive = TRUE)
+#'
+#' # You can also partition by the values in multiple columns
+#' # (here: "cyl" and "gear").
+#' # This creates a structure of the form cyl=X/gear=Y/part-Z.parquet.
+#' two_levels_tree <- tempfile()
+#' write_dataset(mtcars, two_levels_tree, partitioning = c("cyl", "gear"))
+#' list.files(two_levels_tree, recursive = TRUE)
+#'
+#' # In the two previous examples we would have:
+#' # X = {4,6,8}, the number of cylinders.
+#' # Y = {3,4,5}, the number of forward gears.
+#' # Z = {0,1,2}, the number of saved parts, starting from 0.
+#'
+#' # You can obtain the same result as as the previous examples using arrow with
+#' # a dplyr pipeline. This will be the same as two_levels_tree above, but the
+#' # output directory will be different.
+#' library(dplyr)
+#' two_levels_tree_2 <- tempfile()
+#' mtcars %>%
+#' group_by(cyl, gear) %>%
+#' write_dataset(two_levels_tree_2)
+#' list.files(two_levels_tree_2, recursive = TRUE)
+#'
+#' # And you can also turn off the Hive-style directory naming where the column
+#' # name is included with the values by using `hive_style = FALSE`.
+#'
+#' # Write a structure X/Y/part-Z.parquet.
+#' two_levels_tree_no_hive <- tempfile()
+#' mtcars %>%
+#' group_by(cyl, gear) %>%
+#' write_dataset(two_levels_tree_no_hive, hive_style = FALSE)
+#' list.files(two_levels_tree_no_hive, recursive = TRUE)
+#' @export
+write_dataset <- function(dataset,
+ path,
+ format = c("parquet", "feather", "arrow", "ipc", "csv"),
+ partitioning = dplyr::group_vars(dataset),
+ basename_template = paste0("part-{i}.", as.character(format)),
+ hive_style = TRUE,
+ existing_data_behavior = c("overwrite", "error", "delete_matching"),
+ ...) {
+ format <- match.arg(format)
+ if (inherits(dataset, "arrow_dplyr_query")) {
+ # partitioning vars need to be in the `select` schema
+ dataset <- ensure_group_vars(dataset)
+ } else if (inherits(dataset, "grouped_df")) {
+ force(partitioning)
+ # Drop the grouping metadata before writing; we've already consumed it
+ # now to construct `partitioning` and don't want it in the metadata$r
+ dataset <- dplyr::ungroup(dataset)
+ }
+
+ scanner <- Scanner$create(dataset, use_async = TRUE)
+ if (!inherits(partitioning, "Partitioning")) {
+ partition_schema <- scanner$schema[partitioning]
+ if (isTRUE(hive_style)) {
+ partitioning <- HivePartitioning$create(partition_schema, null_fallback = list(...)$null_fallback)
+ } else {
+ partitioning <- DirectoryPartitioning$create(partition_schema)
+ }
+ }
+
+ path_and_fs <- get_path_and_filesystem(path)
+ options <- FileWriteOptions$create(format, table = scanner, ...)
+
+ existing_data_behavior_opts <- c("delete_matching", "overwrite", "error")
+ existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L
+
+ dataset___Dataset__Write(
+ options, path_and_fs$fs, path_and_fs$path,
+ partitioning, basename_template, scanner,
+ existing_data_behavior
+ )
+}