diff options
Diffstat (limited to '')
-rw-r--r-- | src/arrow/r/R/record-batch-reader.R | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/src/arrow/r/R/record-batch-reader.R b/src/arrow/r/R/record-batch-reader.R new file mode 100644 index 000000000..1542e3649 --- /dev/null +++ b/src/arrow/r/R/record-batch-reader.R @@ -0,0 +1,164 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +#' @title RecordBatchReader classes +#' @description Apache Arrow defines two formats for [serializing data for interprocess +#' communication +#' (IPC)](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc): +#' a "stream" format and a "file" format, known as Feather. +#' `RecordBatchStreamReader` and `RecordBatchFileReader` are +#' interfaces for accessing record batches from input sources in those formats, +#' respectively. +#' +#' For guidance on how to use these classes, see the examples section. +#' +#' @seealso [read_ipc_stream()] and [read_feather()] provide a much simpler interface +#' for reading data from these formats and are sufficient for many use cases. +#' @usage NULL +#' @format NULL +#' @docType class +#' @section Factory: +#' +#' The `RecordBatchFileReader$create()` and `RecordBatchStreamReader$create()` +#' factory methods instantiate the object and +#' take a single argument, named according to the class: +#' +#' - `file` A character file name, raw vector, or Arrow file connection object +#' (e.g. [RandomAccessFile]). +#' - `stream` A raw vector, [Buffer], or [InputStream]. +#' +#' @section Methods: +#' +#' - `$read_next_batch()`: Returns a `RecordBatch`, iterating through the +#' Reader. If there are no further batches in the Reader, it returns `NULL`. +#' - `$schema`: Returns a [Schema] (active binding) +#' - `$batches()`: Returns a list of `RecordBatch`es +#' - `$read_table()`: Collects the reader's `RecordBatch`es into a [Table] +#' - `$get_batch(i)`: For `RecordBatchFileReader`, return a particular batch +#' by an integer index. +#' - `$num_record_batches()`: For `RecordBatchFileReader`, see how many batches +#' are in the file. +#' +#' @rdname RecordBatchReader +#' @name RecordBatchReader +#' @include arrow-package.R +#' @examplesIf arrow_available() +#' tf <- tempfile() +#' on.exit(unlink(tf)) +#' +#' batch <- record_batch(chickwts) +#' +#' # This opens a connection to the file in Arrow +#' file_obj <- FileOutputStream$create(tf) +#' # Pass that to a RecordBatchWriter to write data conforming to a schema +#' writer <- RecordBatchFileWriter$create(file_obj, batch$schema) +#' writer$write(batch) +#' # You may write additional batches to the stream, provided that they have +#' # the same schema. +#' # Call "close" on the writer to indicate end-of-file/stream +#' writer$close() +#' # Then, close the connection--closing the IPC message does not close the file +#' file_obj$close() +#' +#' # Now, we have a file we can read from. Same pattern: open file connection, +#' # then pass it to a RecordBatchReader +#' read_file_obj <- ReadableFile$create(tf) +#' reader <- RecordBatchFileReader$create(read_file_obj) +#' # RecordBatchFileReader knows how many batches it has (StreamReader does not) +#' reader$num_record_batches +#' # We could consume the Reader by calling $read_next_batch() until all are, +#' # consumed, or we can call $read_table() to pull them all into a Table +#' tab <- reader$read_table() +#' # Call as.data.frame to turn that Table into an R data.frame +#' df <- as.data.frame(tab) +#' # This should be the same data we sent +#' all.equal(df, chickwts, check.attributes = FALSE) +#' # Unlike the Writers, we don't have to close RecordBatchReaders, +#' # but we do still need to close the file connection +#' read_file_obj$close() +RecordBatchReader <- R6Class("RecordBatchReader", + inherit = ArrowObject, + public = list( + read_next_batch = function() RecordBatchReader__ReadNext(self), + batches = function() RecordBatchReader__batches(self), + read_table = function() Table__from_RecordBatchReader(self), + export_to_c = function(stream_ptr) ExportRecordBatchReader(self, stream_ptr) + ), + active = list( + schema = function() RecordBatchReader__schema(self) + ) +) + +#' @export +head.RecordBatchReader <- function(x, n = 6L, ...) { + head(Scanner$create(x), n) +} + +#' @export +tail.RecordBatchReader <- function(x, n = 6L, ...) { + tail(Scanner$create(x), n) +} + +#' @rdname RecordBatchReader +#' @usage NULL +#' @format NULL +#' @export +RecordBatchStreamReader <- R6Class("RecordBatchStreamReader", inherit = RecordBatchReader) +RecordBatchStreamReader$create <- function(stream) { + if (inherits(stream, c("raw", "Buffer"))) { + # TODO: deprecate this because it doesn't close the connection to the Buffer + # (that's a problem, right?) + stream <- BufferReader$create(stream) + } + assert_is(stream, "InputStream") + ipc___RecordBatchStreamReader__Open(stream) +} +#' @include arrowExports.R +RecordBatchReader$import_from_c <- RecordBatchStreamReader$import_from_c <- ImportRecordBatchReader + +#' @rdname RecordBatchReader +#' @usage NULL +#' @format NULL +#' @export +RecordBatchFileReader <- R6Class("RecordBatchFileReader", + inherit = ArrowObject, + # Why doesn't this inherit from RecordBatchReader in C++? + # Origin: https://github.com/apache/arrow/pull/679 + public = list( + get_batch = function(i) { + ipc___RecordBatchFileReader__ReadRecordBatch(self, i) + }, + batches = function() { + ipc___RecordBatchFileReader__batches(self) + }, + read_table = function() Table__from_RecordBatchFileReader(self) + ), + active = list( + num_record_batches = function() ipc___RecordBatchFileReader__num_record_batches(self), + schema = function() ipc___RecordBatchFileReader__schema(self) + ) +) +RecordBatchFileReader$create <- function(file) { + if (inherits(file, c("raw", "Buffer"))) { + # TODO: deprecate this because it doesn't close the connection to the Buffer + # (that's a problem, right?) + file <- BufferReader$create(file) + } + assert_is(file, "InputStream") + ipc___RecordBatchFileReader__Open(file) +} |