summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/R/dataset-scan.R
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/r/R/dataset-scan.R
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/r/R/dataset-scan.R')
-rw-r--r--src/arrow/r/R/dataset-scan.R262
1 files changed, 262 insertions, 0 deletions
diff --git a/src/arrow/r/R/dataset-scan.R b/src/arrow/r/R/dataset-scan.R
new file mode 100644
index 000000000..03c926fb4
--- /dev/null
+++ b/src/arrow/r/R/dataset-scan.R
@@ -0,0 +1,262 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' Scan the contents of a dataset
+#'
+#' @description
+#' A `Scanner` iterates over a [Dataset]'s fragments and returns data
+#' according to given row filtering and column projection. A `ScannerBuilder`
+#' can help create one.
+#'
+#' @section Factory:
+#' `Scanner$create()` wraps the `ScannerBuilder` interface to make a `Scanner`.
+#' It takes the following arguments:
+#'
+#' * `dataset`: A `Dataset` or `arrow_dplyr_query` object, as returned by the
+#' `dplyr` methods on `Dataset`.
+#' * `projection`: A character vector of column names to select columns or a
+#' named list of expressions
+#' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` (default)
+#' to keep all rows.
+#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE`
+#' * `use_async`: logical: should the async scanner (performs better on
+#' high-latency/highly parallel filesystems like S3) be used? Default `FALSE`
+#' * `...`: Additional arguments, currently ignored
+#' @section Methods:
+#' `ScannerBuilder` has the following methods:
+#'
+#' - `$Project(cols)`: Indicate that the scan should only return columns given
+#' by `cols`, a character vector of column names
+#' - `$Filter(expr)`: Filter rows by an [Expression].
+#' - `$UseThreads(threads)`: logical: should the scan use multithreading?
+#' The method's default input is `TRUE`, but you must call the method to enable
+#' multithreading because the scanner default is `FALSE`.
+#' - `$UseAsync(use_async)`: logical: should the async scanner be used?
+#' - `$BatchSize(batch_size)`: integer: Maximum row count of scanned record
+#' batches, default is 32K. If scanned record batches are overflowing memory
+#' then this method can be called to reduce their size.
+#' - `$schema`: Active binding, returns the [Schema] of the Dataset
+#' - `$Finish()`: Returns a `Scanner`
+#'
+#' `Scanner` currently has a single method, `$ToTable()`, which evaluates the
+#' query and returns an Arrow [Table].
+#' @rdname Scanner
+#' @name Scanner
+#' @export
+Scanner <- R6Class("Scanner",
+ inherit = ArrowObject,
+ public = list(
+ ToTable = function() dataset___Scanner__ToTable(self),
+ ScanBatches = function() dataset___Scanner__ScanBatches(self),
+ ToRecordBatchReader = function() dataset___Scanner__ToRecordBatchReader(self),
+ CountRows = function() dataset___Scanner__CountRows(self)
+ ),
+ active = list(
+ schema = function() dataset___Scanner__schema(self)
+ )
+)
+Scanner$create <- function(dataset,
+ projection = NULL,
+ filter = TRUE,
+ use_threads = option_use_threads(),
+ use_async = getOption("arrow.use_async", FALSE),
+ batch_size = NULL,
+ fragment_scan_options = NULL,
+ ...) {
+ if (inherits(dataset, "arrow_dplyr_query")) {
+ if (is_collapsed(dataset)) {
+ # TODO: Is there a way to get a RecordBatchReader rather than evaluating?
+ dataset$.data <- as_adq(dplyr::compute(dataset$.data))$.data
+ }
+
+ proj <- c(dataset$selected_columns, dataset$temp_columns)
+
+ if (!is.null(projection)) {
+ if (is.character(projection)) {
+ stopifnot("attempting to project with unknown columns" = all(projection %in% names(proj)))
+ proj <- proj[projection]
+ } else {
+ # TODO: ARROW-13802 accepting lists of Expressions as a projection
+ warning(
+ "Scanner$create(projection = ...) must be a character vector, ",
+ "ignoring the projection argument."
+ )
+ }
+ }
+
+ if (!isTRUE(filter)) {
+ dataset <- set_filters(dataset, filter)
+ }
+
+ return(Scanner$create(
+ dataset$.data,
+ proj,
+ dataset$filtered_rows,
+ use_threads,
+ use_async,
+ batch_size,
+ fragment_scan_options,
+ ...
+ ))
+ }
+
+ scanner_builder <- ScannerBuilder$create(dataset)
+ if (use_threads) {
+ scanner_builder$UseThreads()
+ }
+ if (use_async) {
+ scanner_builder$UseAsync()
+ }
+ if (!is.null(projection)) {
+ scanner_builder$Project(projection)
+ }
+ if (!isTRUE(filter)) {
+ scanner_builder$Filter(filter)
+ }
+ if (is_integerish(batch_size)) {
+ scanner_builder$BatchSize(batch_size)
+ }
+ if (!is.null(fragment_scan_options)) {
+ scanner_builder$FragmentScanOptions(fragment_scan_options)
+ }
+ scanner_builder$Finish()
+}
+
+#' @export
+names.Scanner <- function(x) names(x$schema)
+
+#' @export
+head.Scanner <- function(x, n = 6L, ...) {
+ assert_that(n > 0) # For now
+ dataset___Scanner__head(x, n)
+}
+
+#' @export
+tail.Scanner <- function(x, n = 6L, ...) {
+ assert_that(n > 0) # For now
+ result <- list()
+ batch_num <- 0
+ for (batch in rev(dataset___Scanner__ScanBatches(x))) {
+ batch_num <- batch_num + 1
+ result[[batch_num]] <- tail(batch, n)
+ n <- n - nrow(batch)
+ if (n <= 0) break
+ }
+ Table$create(!!!rev(result))
+}
+
+ScanTask <- R6Class("ScanTask",
+ inherit = ArrowObject,
+ public = list(
+ Execute = function() dataset___ScanTask__get_batches(self)
+ )
+)
+
+#' Apply a function to a stream of RecordBatches
+#'
+#' As an alternative to calling `collect()` on a `Dataset` query, you can
+#' use this function to access the stream of `RecordBatch`es in the `Dataset`.
+#' This lets you aggregate on each chunk and pull the intermediate results into
+#' a `data.frame` for further aggregation, even if you couldn't fit the whole
+#' `Dataset` result in memory.
+#'
+#' This is experimental and not recommended for production use.
+#'
+#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
+#' `dplyr` methods on `Dataset`.
+#' @param FUN A function or `purrr`-style lambda expression to apply to each
+#' batch
+#' @param ... Additional arguments passed to `FUN`
+#' @param .data.frame logical: collect the resulting chunks into a single
+#' `data.frame`? Default `TRUE`
+#' @export
+map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
+ if (.data.frame) {
+ lapply <- map_dfr
+ }
+ scanner <- Scanner$create(ensure_group_vars(X))
+ FUN <- as_mapper(FUN)
+ lapply(scanner$ScanBatches(), function(batch) {
+ # TODO: wrap batch in arrow_dplyr_query with X$selected_columns,
+ # X$temp_columns, and X$group_by_vars
+ # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
+ FUN(batch, ...)
+ })
+}
+
+#' @usage NULL
+#' @format NULL
+#' @rdname Scanner
+#' @export
+ScannerBuilder <- R6Class("ScannerBuilder",
+ inherit = ArrowObject,
+ public = list(
+ Project = function(cols) {
+ # cols is either a character vector or a named list of Expressions
+ if (is.character(cols)) {
+ dataset___ScannerBuilder__ProjectNames(self, cols)
+ } else if (length(cols) == 0) {
+ # Empty projection
+ dataset___ScannerBuilder__ProjectNames(self, character(0))
+ } else {
+ # List of Expressions
+ dataset___ScannerBuilder__ProjectExprs(self, cols, names(cols))
+ }
+ self
+ },
+ Filter = function(expr) {
+ assert_is(expr, "Expression")
+ dataset___ScannerBuilder__Filter(self, expr)
+ self
+ },
+ UseThreads = function(threads = option_use_threads()) {
+ dataset___ScannerBuilder__UseThreads(self, threads)
+ self
+ },
+ UseAsync = function(use_async = TRUE) {
+ dataset___ScannerBuilder__UseAsync(self, use_async)
+ self
+ },
+ BatchSize = function(batch_size) {
+ dataset___ScannerBuilder__BatchSize(self, batch_size)
+ self
+ },
+ FragmentScanOptions = function(options) {
+ dataset___ScannerBuilder__FragmentScanOptions(self, options)
+ self
+ },
+ Finish = function() dataset___ScannerBuilder__Finish(self)
+ ),
+ active = list(
+ schema = function() dataset___ScannerBuilder__schema(self)
+ )
+)
+ScannerBuilder$create <- function(dataset) {
+ if (inherits(dataset, "RecordBatchReader")) {
+ return(dataset___ScannerBuilder__FromRecordBatchReader(dataset))
+ }
+
+ if (inherits(dataset, c("data.frame", "ArrowTabular"))) {
+ dataset <- InMemoryDataset$create(dataset)
+ }
+ assert_is(dataset, "Dataset")
+
+ dataset$NewScan()
+}
+
+#' @export
+names.ScannerBuilder <- function(x) names(x$schema)