# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. #' @title RecordBatchReader classes #' @description Apache Arrow defines two formats for [serializing data for interprocess #' communication #' (IPC)](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc): #' a "stream" format and a "file" format, known as Feather. #' `RecordBatchStreamReader` and `RecordBatchFileReader` are #' interfaces for accessing record batches from input sources in those formats, #' respectively. #' #' For guidance on how to use these classes, see the examples section. #' #' @seealso [read_ipc_stream()] and [read_feather()] provide a much simpler interface #' for reading data from these formats and are sufficient for many use cases. #' @usage NULL #' @format NULL #' @docType class #' @section Factory: #' #' The `RecordBatchFileReader$create()` and `RecordBatchStreamReader$create()` #' factory methods instantiate the object and #' take a single argument, named according to the class: #' #' - `file` A character file name, raw vector, or Arrow file connection object #' (e.g. [RandomAccessFile]). #' - `stream` A raw vector, [Buffer], or [InputStream]. #' #' @section Methods: #' #' - `$read_next_batch()`: Returns a `RecordBatch`, iterating through the #' Reader. If there are no further batches in the Reader, it returns `NULL`. #' - `$schema`: Returns a [Schema] (active binding) #' - `$batches()`: Returns a list of `RecordBatch`es #' - `$read_table()`: Collects the reader's `RecordBatch`es into a [Table] #' - `$get_batch(i)`: For `RecordBatchFileReader`, return a particular batch #' by an integer index. #' - `$num_record_batches()`: For `RecordBatchFileReader`, see how many batches #' are in the file. #' #' @rdname RecordBatchReader #' @name RecordBatchReader #' @include arrow-package.R #' @examplesIf arrow_available() #' tf <- tempfile() #' on.exit(unlink(tf)) #' #' batch <- record_batch(chickwts) #' #' # This opens a connection to the file in Arrow #' file_obj <- FileOutputStream$create(tf) #' # Pass that to a RecordBatchWriter to write data conforming to a schema #' writer <- RecordBatchFileWriter$create(file_obj, batch$schema) #' writer$write(batch) #' # You may write additional batches to the stream, provided that they have #' # the same schema. #' # Call "close" on the writer to indicate end-of-file/stream #' writer$close() #' # Then, close the connection--closing the IPC message does not close the file #' file_obj$close() #' #' # Now, we have a file we can read from. Same pattern: open file connection, #' # then pass it to a RecordBatchReader #' read_file_obj <- ReadableFile$create(tf) #' reader <- RecordBatchFileReader$create(read_file_obj) #' # RecordBatchFileReader knows how many batches it has (StreamReader does not) #' reader$num_record_batches #' # We could consume the Reader by calling $read_next_batch() until all are, #' # consumed, or we can call $read_table() to pull them all into a Table #' tab <- reader$read_table() #' # Call as.data.frame to turn that Table into an R data.frame #' df <- as.data.frame(tab) #' # This should be the same data we sent #' all.equal(df, chickwts, check.attributes = FALSE) #' # Unlike the Writers, we don't have to close RecordBatchReaders, #' # but we do still need to close the file connection #' read_file_obj$close() RecordBatchReader <- R6Class("RecordBatchReader", inherit = ArrowObject, public = list( read_next_batch = function() RecordBatchReader__ReadNext(self), batches = function() RecordBatchReader__batches(self), read_table = function() Table__from_RecordBatchReader(self), export_to_c = function(stream_ptr) ExportRecordBatchReader(self, stream_ptr) ), active = list( schema = function() RecordBatchReader__schema(self) ) ) #' @export head.RecordBatchReader <- function(x, n = 6L, ...) { head(Scanner$create(x), n) } #' @export tail.RecordBatchReader <- function(x, n = 6L, ...) { tail(Scanner$create(x), n) } #' @rdname RecordBatchReader #' @usage NULL #' @format NULL #' @export RecordBatchStreamReader <- R6Class("RecordBatchStreamReader", inherit = RecordBatchReader) RecordBatchStreamReader$create <- function(stream) { if (inherits(stream, c("raw", "Buffer"))) { # TODO: deprecate this because it doesn't close the connection to the Buffer # (that's a problem, right?) stream <- BufferReader$create(stream) } assert_is(stream, "InputStream") ipc___RecordBatchStreamReader__Open(stream) } #' @include arrowExports.R RecordBatchReader$import_from_c <- RecordBatchStreamReader$import_from_c <- ImportRecordBatchReader #' @rdname RecordBatchReader #' @usage NULL #' @format NULL #' @export RecordBatchFileReader <- R6Class("RecordBatchFileReader", inherit = ArrowObject, # Why doesn't this inherit from RecordBatchReader in C++? # Origin: https://github.com/apache/arrow/pull/679 public = list( get_batch = function(i) { ipc___RecordBatchFileReader__ReadRecordBatch(self, i) }, batches = function() { ipc___RecordBatchFileReader__batches(self) }, read_table = function() Table__from_RecordBatchFileReader(self) ), active = list( num_record_batches = function() ipc___RecordBatchFileReader__num_record_batches(self), schema = function() ipc___RecordBatchFileReader__schema(self) ) ) RecordBatchFileReader$create <- function(file) { if (inherits(file, c("raw", "Buffer"))) { # TODO: deprecate this because it doesn't close the connection to the Buffer # (that's a problem, right?) file <- BufferReader$create(file) } assert_is(file, "InputStream") ipc___RecordBatchFileReader__Open(file) }