summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/R/io.R
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/arrow/r/R/io.R295
1 files changed, 295 insertions, 0 deletions
diff --git a/src/arrow/r/R/io.R b/src/arrow/r/R/io.R
new file mode 100644
index 000000000..898b306a3
--- /dev/null
+++ b/src/arrow/r/R/io.R
@@ -0,0 +1,295 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @include arrow-package.R
+#' @include enums.R
+#' @include buffer.R
+
+# OutputStream ------------------------------------------------------------
+
+Writable <- R6Class("Writable",
+ inherit = ArrowObject,
+ public = list(
+ write = function(x) io___Writable__write(self, buffer(x))
+ )
+)
+
+#' @title OutputStream classes
+#' @description `FileOutputStream` is for writing to a file;
+#' `BufferOutputStream` writes to a buffer;
+#' You can create one and pass it to any of the table writers, for example.
+#' @usage NULL
+#' @format NULL
+#' @docType class
+#' @section Factory:
+#'
+#' The `$create()` factory methods instantiate the `OutputStream` object and
+#' take the following arguments, depending on the subclass:
+#'
+#' - `path` For `FileOutputStream`, a character file name
+#' - `initial_capacity` For `BufferOutputStream`, the size in bytes of the
+#' buffer.
+#'
+#' @section Methods:
+#'
+#' - `$tell()`: return the position in the stream
+#' - `$close()`: close the stream
+#' - `$write(x)`: send `x` to the stream
+#' - `$capacity()`: for `BufferOutputStream`
+#' - `$finish()`: for `BufferOutputStream`
+#' - `$GetExtentBytesWritten()`: for `MockOutputStream`, report how many bytes
+#' were sent.
+#'
+#' @rdname OutputStream
+#' @name OutputStream
+OutputStream <- R6Class("OutputStream",
+ inherit = Writable,
+ public = list(
+ close = function() io___OutputStream__Close(self),
+ tell = function() io___OutputStream__Tell(self)
+ )
+)
+
+#' @usage NULL
+#' @format NULL
+#' @rdname OutputStream
+#' @export
+FileOutputStream <- R6Class("FileOutputStream", inherit = OutputStream)
+FileOutputStream$create <- function(path) {
+ io___FileOutputStream__Open(clean_path_abs(path))
+}
+
+#' @usage NULL
+#' @format NULL
+#' @rdname OutputStream
+#' @export
+BufferOutputStream <- R6Class("BufferOutputStream",
+ inherit = OutputStream,
+ public = list(
+ capacity = function() io___BufferOutputStream__capacity(self),
+ finish = function() io___BufferOutputStream__Finish(self),
+ write = function(bytes) io___BufferOutputStream__Write(self, bytes),
+ tell = function() io___BufferOutputStream__Tell(self)
+ )
+)
+BufferOutputStream$create <- function(initial_capacity = 0L) {
+ io___BufferOutputStream__Create(initial_capacity)
+}
+
+# InputStream -------------------------------------------------------------
+
+
+Readable <- R6Class("Readable",
+ inherit = ArrowObject,
+ public = list(
+ Read = function(nbytes) io___Readable__Read(self, nbytes)
+ )
+)
+
+#' @title InputStream classes
+#' @description `RandomAccessFile` inherits from `InputStream` and is a base
+#' class for: `ReadableFile` for reading from a file; `MemoryMappedFile` for
+#' the same but with memory mapping; and `BufferReader` for reading from a
+#' buffer. Use these with the various table readers.
+#' @usage NULL
+#' @format NULL
+#' @docType class
+#' @section Factory:
+#'
+#' The `$create()` factory methods instantiate the `InputStream` object and
+#' take the following arguments, depending on the subclass:
+#'
+#' - `path` For `ReadableFile`, a character file name
+#' - `x` For `BufferReader`, a [Buffer] or an object that can be
+#' made into a buffer via `buffer()`.
+#'
+#' To instantiate a `MemoryMappedFile`, call [mmap_open()].
+#'
+#' @section Methods:
+#'
+#' - `$GetSize()`:
+#' - `$supports_zero_copy()`: Logical
+#' - `$seek(position)`: go to that position in the stream
+#' - `$tell()`: return the position in the stream
+#' - `$close()`: close the stream
+#' - `$Read(nbytes)`: read data from the stream, either a specified `nbytes` or
+#' all, if `nbytes` is not provided
+#' - `$ReadAt(position, nbytes)`: similar to `$seek(position)$Read(nbytes)`
+#' - `$Resize(size)`: for a `MemoryMappedFile` that is writeable
+#'
+#' @rdname InputStream
+#' @name InputStream
+InputStream <- R6Class("InputStream",
+ inherit = Readable,
+ public = list(
+ close = function() io___InputStream__Close(self)
+ )
+)
+
+#' @usage NULL
+#' @format NULL
+#' @rdname InputStream
+#' @export
+RandomAccessFile <- R6Class("RandomAccessFile",
+ inherit = InputStream,
+ public = list(
+ GetSize = function() io___RandomAccessFile__GetSize(self),
+ supports_zero_copy = function() io___RandomAccessFile__supports_zero_copy(self),
+ seek = function(position) io___RandomAccessFile__Seek(self, position),
+ tell = function() io___RandomAccessFile__Tell(self),
+ Read = function(nbytes = NULL) {
+ if (is.null(nbytes)) {
+ io___RandomAccessFile__Read0(self)
+ } else {
+ io___Readable__Read(self, nbytes)
+ }
+ },
+ ReadAt = function(position, nbytes = NULL) {
+ if (is.null(nbytes)) {
+ nbytes <- self$GetSize() - position
+ }
+ io___RandomAccessFile__ReadAt(self, position, nbytes)
+ }
+ )
+)
+
+#' @usage NULL
+#' @format NULL
+#' @rdname InputStream
+#' @export
+MemoryMappedFile <- R6Class("MemoryMappedFile",
+ inherit = RandomAccessFile,
+ public = list(
+ Resize = function(size) io___MemoryMappedFile__Resize(self, size)
+ )
+)
+
+#' @usage NULL
+#' @format NULL
+#' @rdname InputStream
+#' @export
+ReadableFile <- R6Class("ReadableFile", inherit = RandomAccessFile)
+ReadableFile$create <- function(path) {
+ io___ReadableFile__Open(clean_path_abs(path))
+}
+
+#' @usage NULL
+#' @format NULL
+#' @rdname InputStream
+#' @export
+BufferReader <- R6Class("BufferReader", inherit = RandomAccessFile)
+BufferReader$create <- function(x) {
+ x <- buffer(x)
+ io___BufferReader__initialize(x)
+}
+
+#' Create a new read/write memory mapped file of a given size
+#'
+#' @param path file path
+#' @param size size in bytes
+#'
+#' @return a [arrow::io::MemoryMappedFile][MemoryMappedFile]
+#'
+#' @export
+mmap_create <- function(path, size) {
+ path <- clean_path_abs(path)
+ io___MemoryMappedFile__Create(path, size)
+}
+
+#' Open a memory mapped file
+#'
+#' @param path file path
+#' @param mode file mode (read/write/readwrite)
+#'
+#' @export
+mmap_open <- function(path, mode = c("read", "write", "readwrite")) {
+ mode <- match(match.arg(mode), c("read", "write", "readwrite")) - 1L
+ path <- clean_path_abs(path)
+ io___MemoryMappedFile__Open(path, mode)
+}
+
+#' Handle a range of possible input sources
+#' @param file A character file name, `raw` vector, or an Arrow input stream
+#' @param mmap Logical: whether to memory-map the file (default `TRUE`)
+#' @param compression If the file is compressed, created a [CompressedInputStream]
+#' with this compression codec, either a [Codec] or the string name of one.
+#' If `NULL` (default) and `file` is a string file name, the function will try
+#' to infer compression from the file extension.
+#' @param filesystem If not `NULL`, `file` will be opened via the
+#' `filesystem$OpenInputFile()` filesystem method, rather than the `io` module's
+#' `MemoryMappedFile` or `ReadableFile` constructors.
+#' @return An `InputStream` or a subclass of one.
+#' @keywords internal
+make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem = NULL) {
+ if (inherits(file, "SubTreeFileSystem")) {
+ filesystem <- file$base_fs
+ file <- file$base_path
+ }
+ if (is.string(file)) {
+ if (is_url(file)) {
+ fs_and_path <- FileSystem$from_uri(file)
+ filesystem <- fs_and_path$fs
+ file <- fs_and_path$path
+ }
+ if (is.null(compression)) {
+ # Infer compression from the file path
+ compression <- detect_compression(file)
+ }
+ if (!is.null(filesystem)) {
+ file <- filesystem$OpenInputFile(file)
+ } else if (isTRUE(mmap)) {
+ file <- mmap_open(file)
+ } else {
+ file <- ReadableFile$create(file)
+ }
+ if (!identical(compression, "uncompressed")) {
+ file <- CompressedInputStream$create(file, compression)
+ }
+ } else if (inherits(file, c("raw", "Buffer"))) {
+ file <- BufferReader$create(file)
+ }
+ assert_is(file, "InputStream")
+ file
+}
+
+make_output_stream <- function(x, filesystem = NULL) {
+ if (inherits(x, "SubTreeFileSystem")) {
+ filesystem <- x$base_fs
+ x <- x$base_path
+ } else if (is_url(x)) {
+ fs_and_path <- FileSystem$from_uri(x)
+ filesystem <- fs_and_path$fs
+ x <- fs_and_path$path
+ }
+ assert_that(is.string(x))
+ if (is.null(filesystem)) {
+ FileOutputStream$create(x)
+ } else {
+ filesystem$OpenOutputStream(x)
+ }
+}
+
+detect_compression <- function(path) {
+ assert_that(is.string(path))
+ switch(tools::file_ext(path),
+ bz2 = "bz2",
+ gz = "gzip",
+ lz4 = "lz4",
+ zst = "zstd",
+ "uncompressed"
+ )
+}