diff options
Diffstat (limited to 'src/arrow/r/R/dataset.R')
-rw-r--r-- | src/arrow/r/R/dataset.R | 367 |
1 files changed, 367 insertions, 0 deletions
diff --git a/src/arrow/r/R/dataset.R b/src/arrow/r/R/dataset.R new file mode 100644 index 000000000..7207a5543 --- /dev/null +++ b/src/arrow/r/R/dataset.R @@ -0,0 +1,367 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' Open a multi-file dataset +#' +#' Arrow Datasets allow you to query against data that has been split across +#' multiple files. This sharding of data may indicate partitioning, which +#' can accelerate queries that only touch some partitions (files). Call +#' `open_dataset()` to point to a directory of data files and return a +#' `Dataset`, then use `dplyr` methods to query it. +#' +#' @param sources One of: +#' * a string path or URI to a directory containing data files +#' * a string path or URI to a single file +#' * a character vector of paths or URIs to individual data files +#' * a list of `Dataset` objects as created by this function +#' * a list of `DatasetFactory` objects as created by [dataset_factory()]. +#' +#' When `sources` is a vector of file URIs, they must all use the same protocol +#' and point to files located in the same file system and having the same +#' format. +#' @param schema [Schema] for the `Dataset`. If `NULL` (the default), the schema +#' will be inferred from the data sources. +#' @param partitioning When `sources` is a directory path/URI, one of: +#' * a `Schema`, in which case the file paths relative to `sources` will be +#' parsed, and path segments will be matched with the schema fields. For +#' example, `schema(year = int16(), month = int8())` would create partitions +#' for file paths like `"2019/01/file.parquet"`, `"2019/02/file.parquet"`, +#' etc. +#' * a character vector that defines the field names corresponding to those +#' path segments (that is, you're providing the names that would correspond +#' to a `Schema` but the types will be autodetected) +#' * a `HivePartitioning` or `HivePartitioningFactory`, as returned +#' by [hive_partition()] which parses explicit or autodetected fields from +#' Hive-style path segments +#' * `NULL` for no partitioning +#' +#' The default is to autodetect Hive-style partitions. When `sources` is not a +#' directory path/URI, `partitioning` is ignored. +#' @param unify_schemas logical: should all data fragments (files, `Dataset`s) +#' be scanned in order to create a unified schema from them? If `FALSE`, only +#' the first fragment will be inspected for its schema. Use this fast path +#' when you know and trust that all fragments have an identical schema. +#' The default is `FALSE` when creating a dataset from a directory path/URI or +#' vector of file paths/URIs (because there may be many files and scanning may +#' be slow) but `TRUE` when `sources` is a list of `Dataset`s (because there +#' should be few `Dataset`s in the list and their `Schema`s are already in +#' memory). +#' @param format A [FileFormat] object, or a string identifier of the format of +#' the files in `x`. This argument is ignored when `sources` is a list of `Dataset` objects. +#' Currently supported values: +#' * "parquet" +#' * "ipc"/"arrow"/"feather", all aliases for each other; for Feather, note that +#' only version 2 files are supported +#' * "csv"/"text", aliases for the same thing (because comma is the default +#' delimiter for text files +#' * "tsv", equivalent to passing `format = "text", delimiter = "\t"` +#' +#' Default is "parquet", unless a `delimiter` is also specified, in which case +#' it is assumed to be "text". +#' @param ... additional arguments passed to `dataset_factory()` when `sources` +#' is a directory path/URI or vector of file paths/URIs, otherwise ignored. +#' These may include `format` to indicate the file format, or other +#' format-specific options. +#' @return A [Dataset] R6 object. Use `dplyr` methods on it to query the data, +#' or call [`$NewScan()`][Scanner] to construct a query directly. +#' @export +#' @seealso `vignette("dataset", package = "arrow")` +#' @include arrow-package.R +#' @examplesIf arrow_with_dataset() & arrow_with_parquet() +#' # Set up directory for examples +#' tf <- tempfile() +#' dir.create(tf) +#' on.exit(unlink(tf)) +#' +#' data <- dplyr::group_by(mtcars, cyl) +#' write_dataset(data, tf) +#' +#' # You can specify a directory containing the files for your dataset and +#' # open_dataset will scan all files in your directory. +#' open_dataset(tf) +#' +#' # You can also supply a vector of paths +#' open_dataset(c(file.path(tf, "cyl=4/part-0.parquet"), file.path(tf, "cyl=8/part-0.parquet"))) +#' +#' ## You must specify the file format if using a format other than parquet. +#' tf2 <- tempfile() +#' dir.create(tf2) +#' on.exit(unlink(tf2)) +#' write_dataset(data, tf2, format = "ipc") +#' # This line will results in errors when you try to work with the data +#' \dontrun{ +#' open_dataset(tf2) +#' } +#' # This line will work +#' open_dataset(tf2, format = "ipc") +#' +#' ## You can specify file partitioning to include it as a field in your dataset +#' # Create a temporary directory and write example dataset +#' tf3 <- tempfile() +#' dir.create(tf3) +#' on.exit(unlink(tf3)) +#' write_dataset(airquality, tf3, partitioning = c("Month", "Day"), hive_style = FALSE) +#' +#' # View files - you can see the partitioning means that files have been written +#' # to folders based on Month/Day values +#' tf3_files <- list.files(tf3, recursive = TRUE) +#' +#' # With no partitioning specified, dataset contains all files but doesn't include +#' # directory names as field names +#' open_dataset(tf3) +#' +#' # Now that partitioning has been specified, your dataset contains columns for Month and Day +#' open_dataset(tf3, partitioning = c("Month", "Day")) +#' +#' # If you want to specify the data types for your fields, you can pass in a Schema +#' open_dataset(tf3, partitioning = schema(Month = int8(), Day = int8())) +open_dataset <- function(sources, + schema = NULL, + partitioning = hive_partition(), + unify_schemas = NULL, + format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"), + ...) { + if (!arrow_with_dataset()) { + stop("This build of the arrow package does not support Datasets", call. = FALSE) + } + if (is_list_of(sources, "Dataset")) { + if (is.null(schema)) { + if (is.null(unify_schemas) || isTRUE(unify_schemas)) { + # Default is to unify schemas here + schema <- unify_schemas(schemas = map(sources, ~ .$schema)) + } else { + # Take the first one. + schema <- sources[[1]]$schema + } + } + # Enforce that all datasets have the same schema + assert_is(schema, "Schema") + sources <- lapply(sources, function(x) { + x$schema <- schema + x + }) + return(dataset___UnionDataset__create(sources, schema)) + } + + factory <- DatasetFactory$create(sources, partitioning = partitioning, format = format, schema = schema, ...) + tryCatch( + # Default is _not_ to inspect/unify schemas + factory$Finish(schema, isTRUE(unify_schemas)), + error = function(e) { + handle_parquet_io_error(e, format) + } + ) +} + +#' Multi-file datasets +#' +#' @description +#' Arrow Datasets allow you to query against data that has been split across +#' multiple files. This sharding of data may indicate partitioning, which +#' can accelerate queries that only touch some partitions (files). +#' +#' A `Dataset` contains one or more `Fragments`, such as files, of potentially +#' differing type and partitioning. +#' +#' For `Dataset$create()`, see [open_dataset()], which is an alias for it. +#' +#' `DatasetFactory` is used to provide finer control over the creation of `Dataset`s. +#' +#' @section Factory: +#' `DatasetFactory` is used to create a `Dataset`, inspect the [Schema] of the +#' fragments contained in it, and declare a partitioning. +#' `FileSystemDatasetFactory` is a subclass of `DatasetFactory` for +#' discovering files in the local file system, the only currently supported +#' file system. +#' +#' For the `DatasetFactory$create()` factory method, see [dataset_factory()], an +#' alias for it. A `DatasetFactory` has: +#' +#' - `$Inspect(unify_schemas)`: If `unify_schemas` is `TRUE`, all fragments +#' will be scanned and a unified [Schema] will be created from them; if `FALSE` +#' (default), only the first fragment will be inspected for its schema. Use this +#' fast path when you know and trust that all fragments have an identical schema. +#' - `$Finish(schema, unify_schemas)`: Returns a `Dataset`. If `schema` is provided, +#' it will be used for the `Dataset`; if omitted, a `Schema` will be created from +#' inspecting the fragments (files) in the dataset, following `unify_schemas` +#' as described above. +#' +#' `FileSystemDatasetFactory$create()` is a lower-level factory method and +#' takes the following arguments: +#' * `filesystem`: A [FileSystem] +#' * `selector`: Either a [FileSelector] or `NULL` +#' * `paths`: Either a character vector of file paths or `NULL` +#' * `format`: A [FileFormat] +#' * `partitioning`: Either `Partitioning`, `PartitioningFactory`, or `NULL` +#' @section Methods: +#' +#' A `Dataset` has the following methods: +#' - `$NewScan()`: Returns a [ScannerBuilder] for building a query +#' - `$schema`: Active binding that returns the [Schema] of the Dataset; you +#' may also replace the dataset's schema by using `ds$schema <- new_schema`. +#' This method currently supports only adding, removing, or reordering +#' fields in the schema: you cannot alter or cast the field types. +#' +#' `FileSystemDataset` has the following methods: +#' - `$files`: Active binding, returns the files of the `FileSystemDataset` +#' - `$format`: Active binding, returns the [FileFormat] of the `FileSystemDataset` +#' +#' `UnionDataset` has the following methods: +#' - `$children`: Active binding, returns all child `Dataset`s. +#' +#' @export +#' @seealso [open_dataset()] for a simple interface to creating a `Dataset` +Dataset <- R6Class("Dataset", + inherit = ArrowObject, + public = list( + # @description + # Start a new scan of the data + # @return A [ScannerBuilder] + NewScan = function() dataset___Dataset__NewScan(self), + ToString = function() self$schema$ToString() + ), + active = list( + schema = function(schema) { + if (missing(schema)) { + dataset___Dataset__schema(self) + } else { + assert_is(schema, "Schema") + invisible(dataset___Dataset__ReplaceSchema(self, schema)) + } + }, + metadata = function() self$schema$metadata, + num_rows = function() self$NewScan()$Finish()$CountRows(), + num_cols = function() length(self$schema), + # @description + # Return the Dataset's type. + type = function() dataset___Dataset__type_name(self) + ) +) +Dataset$create <- open_dataset + +#' @name FileSystemDataset +#' @rdname Dataset +#' @export +FileSystemDataset <- R6Class("FileSystemDataset", + inherit = Dataset, + public = list( + .class_title = function() { + nfiles <- length(self$files) + file_type <- self$format$type + pretty_file_type <- list( + parquet = "Parquet", + ipc = "Feather" + )[[file_type]] + + paste( + class(self)[[1]], + "with", + nfiles, + pretty_file_type %||% file_type, + ifelse(nfiles == 1, "file", "files") + ) + } + ), + active = list( + # @description + # Return the files contained in this `FileSystemDataset` + files = function() dataset___FileSystemDataset__files(self), + # @description + # Return the format of files in this `Dataset` + format = function() { + dataset___FileSystemDataset__format(self) + }, + # @description + # Return the filesystem of files in this `Dataset` + filesystem = function() { + dataset___FileSystemDataset__filesystem(self) + } + ) +) + +#' @name UnionDataset +#' @rdname Dataset +#' @export +UnionDataset <- R6Class("UnionDataset", + inherit = Dataset, + active = list( + # @description + # Return the UnionDataset's child `Dataset`s + children = function() { + dataset___UnionDataset__children(self) + } + ) +) + +#' @name InMemoryDataset +#' @rdname Dataset +#' @export +InMemoryDataset <- R6Class("InMemoryDataset", inherit = Dataset) +InMemoryDataset$create <- function(x) { + if (!arrow_with_dataset()) { + stop("This build of the arrow package does not support Datasets", call. = FALSE) + } + if (!inherits(x, "Table")) { + x <- Table$create(x) + } + dataset___InMemoryDataset__create(x) +} + + +#' @export +names.Dataset <- function(x) names(x$schema) + +#' @export +dim.Dataset <- function(x) c(x$num_rows, x$num_cols) + +#' @export +c.Dataset <- function(...) Dataset$create(list(...)) + +#' @export +head.Dataset <- function(x, n = 6L, ...) { + head(Scanner$create(x), n) +} + +#' @export +tail.Dataset <- function(x, n = 6L, ...) { + tail(Scanner$create(x), n) +} + +#' @export +`[.Dataset` <- function(x, i, j, ..., drop = FALSE) { + if (nargs() == 2L) { + # List-like column extraction (x[i]) + return(x[, i]) + } + if (!missing(j)) { + x <- select.Dataset(x, all_of(j)) + } + + if (!missing(i)) { + x <- take_dataset_rows(x, i) + } + x +} + +take_dataset_rows <- function(x, i) { + if (!is.numeric(i) || any(i < 0)) { + stop("Only slicing with positive indices is supported", call. = FALSE) + } + scanner <- Scanner$create(x) + i <- Array$create(i - 1) + dataset___Scanner__TakeRows(scanner, i) +} |