diff options
Diffstat (limited to '')
-rw-r--r-- | src/arrow/r/R/record-batch.R | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/src/arrow/r/R/record-batch.R b/src/arrow/r/R/record-batch.R new file mode 100644 index 000000000..c66ff7fb0 --- /dev/null +++ b/src/arrow/r/R/record-batch.R @@ -0,0 +1,193 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' @include arrow-package.R +#' @include array.R +#' @title RecordBatch class +#' @description A record batch is a collection of equal-length arrays matching +#' a particular [Schema]. It is a table-like data structure that is semantically +#' a sequence of [fields][Field], each a contiguous Arrow [Array]. +#' @usage NULL +#' @format NULL +#' @docType class +#' +#' @section S3 Methods and Usage: +#' Record batches are data-frame-like, and many methods you expect to work on +#' a `data.frame` are implemented for `RecordBatch`. This includes `[`, `[[`, +#' `$`, `names`, `dim`, `nrow`, `ncol`, `head`, and `tail`. You can also pull +#' the data from an Arrow record batch into R with `as.data.frame()`. See the +#' examples. +#' +#' A caveat about the `$` method: because `RecordBatch` is an `R6` object, +#' `$` is also used to access the object's methods (see below). Methods take +#' precedence over the table's columns. So, `batch$Slice` would return the +#' "Slice" method function even if there were a column in the table called +#' "Slice". +#' +#' @section R6 Methods: +#' In addition to the more R-friendly S3 methods, a `RecordBatch` object has +#' the following R6 methods that map onto the underlying C++ methods: +#' +#' - `$Equals(other)`: Returns `TRUE` if the `other` record batch is equal +#' - `$column(i)`: Extract an `Array` by integer position from the batch +#' - `$column_name(i)`: Get a column's name by integer position +#' - `$names()`: Get all column names (called by `names(batch)`) +#' - `$RenameColumns(value)`: Set all column names (called by `names(batch) <- value`) +#' - `$GetColumnByName(name)`: Extract an `Array` by string name +#' - `$RemoveColumn(i)`: Drops a column from the batch by integer position +#' - `$SelectColumns(indices)`: Return a new record batch with a selection of columns, expressed as 0-based integers. +#' - `$Slice(offset, length = NULL)`: Create a zero-copy view starting at the +#' indicated integer offset and going for the given length, or to the end +#' of the table if `NULL`, the default. +#' - `$Take(i)`: return an `RecordBatch` with rows at positions given by +#' integers (R vector or Array Array) `i`. +#' - `$Filter(i, keep_na = TRUE)`: return an `RecordBatch` with rows at positions where logical +#' vector (or Arrow boolean Array) `i` is `TRUE`. +#' - `$SortIndices(names, descending = FALSE)`: return an `Array` of integer row +#' positions that can be used to rearrange the `RecordBatch` in ascending or +#' descending order by the first named column, breaking ties with further named +#' columns. `descending` can be a logical vector of length one or of the same +#' length as `names`. +#' - `$serialize()`: Returns a raw vector suitable for interprocess communication +#' - `$cast(target_schema, safe = TRUE, options = cast_options(safe))`: Alter +#' the schema of the record batch. +#' +#' There are also some active bindings +#' - `$num_columns` +#' - `$num_rows` +#' - `$schema` +#' - `$metadata`: Returns the key-value metadata of the `Schema` as a named list. +#' Modify or replace by assigning in (`batch$metadata <- new_metadata`). +#' All list elements are coerced to string. See `schema()` for more information. +#' - `$columns`: Returns a list of `Array`s +#' @rdname RecordBatch +#' @name RecordBatch +#' @export +RecordBatch <- R6Class("RecordBatch", + inherit = ArrowTabular, + public = list( + column = function(i) RecordBatch__column(self, i), + column_name = function(i) RecordBatch__column_name(self, i), + names = function() RecordBatch__names(self), + RenameColumns = function(value) RecordBatch__RenameColumns(self, value), + Equals = function(other, check_metadata = FALSE, ...) { + inherits(other, "RecordBatch") && RecordBatch__Equals(self, other, isTRUE(check_metadata)) + }, + GetColumnByName = function(name) { + assert_that(is.string(name)) + RecordBatch__GetColumnByName(self, name) + }, + SelectColumns = function(indices) RecordBatch__SelectColumns(self, indices), + AddColumn = function(i, new_field, value) { + RecordBatch__AddColumn(self, i, new_field, value) + }, + SetColumn = function(i, new_field, value) { + RecordBatch__SetColumn(self, i, new_field, value) + }, + RemoveColumn = function(i) RecordBatch__RemoveColumn(self, i), + ReplaceSchemaMetadata = function(new) { + RecordBatch__ReplaceSchemaMetadata(self, new) + }, + Slice = function(offset, length = NULL) { + if (is.null(length)) { + RecordBatch__Slice1(self, offset) + } else { + RecordBatch__Slice2(self, offset, length) + } + }, + # Take, Filter, and SortIndices are methods on ArrowTabular + serialize = function() ipc___SerializeRecordBatch__Raw(self), + to_data_frame = function() { + RecordBatch__to_dataframe(self, use_threads = option_use_threads()) + }, + cast = function(target_schema, safe = TRUE, ..., options = cast_options(safe, ...)) { + assert_is(target_schema, "Schema") + assert_that(identical(self$schema$names, target_schema$names), msg = "incompatible schemas") + RecordBatch__cast(self, target_schema, options) + }, + invalidate = function() { + .Call(`_arrow_RecordBatch__Reset`, self) + super$invalidate() + }, + export_to_c = function(array_ptr, schema_ptr) { + ExportRecordBatch(self, array_ptr, schema_ptr) + } + ), + active = list( + num_columns = function() RecordBatch__num_columns(self), + num_rows = function() RecordBatch__num_rows(self), + schema = function() RecordBatch__schema(self), + columns = function() RecordBatch__columns(self) + ) +) + +RecordBatch$create <- function(..., schema = NULL) { + arrays <- list2(...) + if (length(arrays) == 1 && inherits(arrays[[1]], c("raw", "Buffer", "InputStream", "Message"))) { + return(RecordBatch$from_message(arrays[[1]], schema)) + } + + # Else, a list of arrays or data.frames + # making sure there are always names + if (is.null(names(arrays))) { + names(arrays) <- rep_len("", length(arrays)) + } + stopifnot(length(arrays) > 0) + + # If any arrays are length 1, recycle them + arrays <- recycle_scalars(arrays) + + # TODO: should this also assert that they're all Arrays? + RecordBatch__from_arrays(schema, arrays) +} + +RecordBatch$from_message <- function(obj, schema) { + # Message/Buffer readers, previously in read_record_batch() + assert_is(schema, "Schema") + if (inherits(obj, c("raw", "Buffer"))) { + obj <- BufferReader$create(obj) + on.exit(obj$close()) + } + if (inherits(obj, "InputStream")) { + ipc___ReadRecordBatch__InputStream__Schema(obj, schema) + } else { + ipc___ReadRecordBatch__Message__Schema(obj, schema) + } +} +#' @include arrowExports.R +RecordBatch$import_from_c <- ImportRecordBatch + +#' @param ... A `data.frame` or a named set of Arrays or vectors. If given a +#' mixture of data.frames and vectors, the inputs will be autospliced together +#' (see examples). Alternatively, you can provide a single Arrow IPC +#' `InputStream`, `Message`, `Buffer`, or R `raw` object containing a `Buffer`. +#' @param schema a [Schema], or `NULL` (the default) to infer the schema from +#' the data in `...`. When providing an Arrow IPC buffer, `schema` is required. +#' @rdname RecordBatch +#' @examplesIf arrow_available() +#' batch <- record_batch(name = rownames(mtcars), mtcars) +#' dim(batch) +#' dim(head(batch)) +#' names(batch) +#' batch$mpg +#' batch[["cyl"]] +#' as.data.frame(batch[4:8, c("gear", "hp", "wt")]) +#' @export +record_batch <- RecordBatch$create + +#' @export +names.RecordBatch <- function(x) x$names() |