summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/R/record-batch-writer.R
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/arrow/r/R/record-batch-writer.R194
1 files changed, 194 insertions, 0 deletions
diff --git a/src/arrow/r/R/record-batch-writer.R b/src/arrow/r/R/record-batch-writer.R
new file mode 100644
index 000000000..8675e785a
--- /dev/null
+++ b/src/arrow/r/R/record-batch-writer.R
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+#' @title RecordBatchWriter classes
+#' @description Apache Arrow defines two formats for [serializing data for interprocess
+#' communication
+#' (IPC)](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc):
+#' a "stream" format and a "file" format, known as Feather.
+#' `RecordBatchStreamWriter` and `RecordBatchFileWriter` are
+#' interfaces for writing record batches to those formats, respectively.
+#'
+#' For guidance on how to use these classes, see the examples section.
+#'
+#' @seealso [write_ipc_stream()] and [write_feather()] provide a much simpler
+#' interface for writing data to these formats and are sufficient for many use
+#' cases. [write_to_raw()] is a version that serializes data to a buffer.
+#' @usage NULL
+#' @format NULL
+#' @docType class
+#' @section Factory:
+#'
+#' The `RecordBatchFileWriter$create()` and `RecordBatchStreamWriter$create()`
+#' factory methods instantiate the object and take the following arguments:
+#'
+#' - `sink` An `OutputStream`
+#' - `schema` A [Schema] for the data to be written
+#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
+#' versions 0.14 and lower can read it. Default is `FALSE`. You can also
+#' enable this by setting the environment variable `ARROW_PRE_0_15_IPC_FORMAT=1`.
+#' - `metadata_version`: A string like "V5" or the equivalent integer indicating
+#' the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
+#' unless the environment variable `ARROW_PRE_1_0_METADATA_VERSION=1`, in
+#' which case it will be V4.
+#'
+#' @section Methods:
+#'
+#' - `$write(x)`: Write a [RecordBatch], [Table], or `data.frame`, dispatching
+#' to the methods below appropriately
+#' - `$write_batch(batch)`: Write a `RecordBatch` to stream
+#' - `$write_table(table)`: Write a `Table` to stream
+#' - `$close()`: close stream. Note that this indicates end-of-file or
+#' end-of-stream--it does not close the connection to the `sink`. That needs
+#' to be closed separately.
+#'
+#' @rdname RecordBatchWriter
+#' @name RecordBatchWriter
+#' @include arrow-package.R
+#' @examplesIf arrow_available()
+#' tf <- tempfile()
+#' on.exit(unlink(tf))
+#'
+#' batch <- record_batch(chickwts)
+#'
+#' # This opens a connection to the file in Arrow
+#' file_obj <- FileOutputStream$create(tf)
+#' # Pass that to a RecordBatchWriter to write data conforming to a schema
+#' writer <- RecordBatchFileWriter$create(file_obj, batch$schema)
+#' writer$write(batch)
+#' # You may write additional batches to the stream, provided that they have
+#' # the same schema.
+#' # Call "close" on the writer to indicate end-of-file/stream
+#' writer$close()
+#' # Then, close the connection--closing the IPC message does not close the file
+#' file_obj$close()
+#'
+#' # Now, we have a file we can read from. Same pattern: open file connection,
+#' # then pass it to a RecordBatchReader
+#' read_file_obj <- ReadableFile$create(tf)
+#' reader <- RecordBatchFileReader$create(read_file_obj)
+#' # RecordBatchFileReader knows how many batches it has (StreamReader does not)
+#' reader$num_record_batches
+#' # We could consume the Reader by calling $read_next_batch() until all are,
+#' # consumed, or we can call $read_table() to pull them all into a Table
+#' tab <- reader$read_table()
+#' # Call as.data.frame to turn that Table into an R data.frame
+#' df <- as.data.frame(tab)
+#' # This should be the same data we sent
+#' all.equal(df, chickwts, check.attributes = FALSE)
+#' # Unlike the Writers, we don't have to close RecordBatchReaders,
+#' # but we do still need to close the file connection
+#' read_file_obj$close()
+RecordBatchWriter <- R6Class("RecordBatchWriter",
+ inherit = ArrowObject,
+ public = list(
+ write_batch = function(batch) ipc___RecordBatchWriter__WriteRecordBatch(self, batch),
+ write_table = function(table) ipc___RecordBatchWriter__WriteTable(self, table),
+ write = function(x) {
+ if (inherits(x, "RecordBatch")) {
+ self$write_batch(x)
+ } else if (inherits(x, "Table")) {
+ self$write_table(x)
+ } else {
+ self$write_table(Table$create(x))
+ }
+ },
+ close = function() ipc___RecordBatchWriter__Close(self)
+ )
+)
+
+#' @usage NULL
+#' @format NULL
+#' @rdname RecordBatchWriter
+#' @export
+RecordBatchStreamWriter <- R6Class("RecordBatchStreamWriter", inherit = RecordBatchWriter)
+RecordBatchStreamWriter$create <- function(sink,
+ schema,
+ use_legacy_format = NULL,
+ metadata_version = NULL) {
+ if (is.string(sink)) {
+ stop(
+ "RecordBatchStreamWriter$create() requires an Arrow InputStream. ",
+ "Try providing FileOutputStream$create(", substitute(sink), ")",
+ call. = FALSE
+ )
+ }
+ assert_is(sink, "OutputStream")
+ assert_is(schema, "Schema")
+
+ ipc___RecordBatchStreamWriter__Open(
+ sink,
+ schema,
+ get_ipc_use_legacy_format(use_legacy_format),
+ get_ipc_metadata_version(metadata_version)
+ )
+}
+
+#' @usage NULL
+#' @format NULL
+#' @rdname RecordBatchWriter
+#' @export
+RecordBatchFileWriter <- R6Class("RecordBatchFileWriter", inherit = RecordBatchStreamWriter)
+RecordBatchFileWriter$create <- function(sink,
+ schema,
+ use_legacy_format = NULL,
+ metadata_version = NULL) {
+ if (is.string(sink)) {
+ stop(
+ "RecordBatchFileWriter$create() requires an Arrow InputStream. ",
+ "Try providing FileOutputStream$create(", substitute(sink), ")",
+ call. = FALSE
+ )
+ }
+ assert_is(sink, "OutputStream")
+ assert_is(schema, "Schema")
+
+ ipc___RecordBatchFileWriter__Open(
+ sink,
+ schema,
+ get_ipc_use_legacy_format(use_legacy_format),
+ get_ipc_metadata_version(metadata_version)
+ )
+}
+
+get_ipc_metadata_version <- function(x) {
+ input <- x
+ if (is_integerish(x)) {
+ # 4 means "V4", which actually happens to be 3L
+ x <- paste0("V", x)
+ } else if (is.null(x)) {
+ if (identical(Sys.getenv("ARROW_PRE_1_0_METADATA_VERSION"), "1") ||
+ identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1")) {
+ # PRE_1_0 is specific for this;
+ # if you already set PRE_0_15, PRE_1_0 should be implied
+ x <- "V4"
+ } else {
+ # Take the latest
+ x <- length(MetadataVersion)
+ }
+ }
+ out <- MetadataVersion[[x]]
+ if (is.null(out)) {
+ stop(deparse(input), " is not a valid IPC MetadataVersion", call. = FALSE)
+ }
+ out
+}
+
+get_ipc_use_legacy_format <- function(x) {
+ isTRUE(x %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1"))
+}