summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/R/filesystem.R
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/arrow/r/R/filesystem.R505
1 files changed, 505 insertions, 0 deletions
diff --git a/src/arrow/r/R/filesystem.R b/src/arrow/r/R/filesystem.R
new file mode 100644
index 000000000..a09d0a51d
--- /dev/null
+++ b/src/arrow/r/R/filesystem.R
@@ -0,0 +1,505 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @include arrow-package.R
+#' @title FileSystem entry info
+#' @usage NULL
+#' @format NULL
+#'
+#' @section Methods:
+#'
+#' - `base_name()` : The file base name (component after the last directory
+#' separator).
+#' - `extension()` : The file extension
+#'
+#' @section Active bindings:
+#'
+#' - `$type`: The file type
+#' - `$path`: The full file path in the filesystem
+#' - `$size`: The size in bytes, if available. Only regular files are
+#' guaranteed to have a size.
+#' - `$mtime`: The time of last modification, if available.
+#'
+#' @rdname FileInfo
+#' @export
+FileInfo <- R6Class("FileInfo",
+ inherit = ArrowObject,
+ public = list(
+ base_name = function() fs___FileInfo__base_name(self),
+ extension = function() fs___FileInfo__extension(self)
+ ),
+ active = list(
+ type = function(type) {
+ if (missing(type)) {
+ fs___FileInfo__type(self)
+ } else {
+ fs___FileInfo__set_type(self, type)
+ }
+ },
+ path = function(path) {
+ if (missing(path)) {
+ fs___FileInfo__path(self)
+ } else {
+ invisible(fs___FileInfo__set_path(self))
+ }
+ },
+ size = function(size) {
+ if (missing(size)) {
+ fs___FileInfo__size(self)
+ } else {
+ invisible(fs___FileInfo__set_size(self, size))
+ }
+ },
+ mtime = function(time) {
+ if (missing(time)) {
+ fs___FileInfo__mtime(self)
+ } else {
+ if (!inherits(time, "POSIXct") && length(time) == 1L) {
+ abort("invalid time")
+ }
+ invisible(fs___FileInfo__set_mtime(self, time))
+ }
+ }
+ )
+)
+
+#' @title file selector
+#' @format NULL
+#'
+#' @section Factory:
+#'
+#' The `$create()` factory method instantiates a `FileSelector` given the 3 fields
+#' described below.
+#'
+#' @section Fields:
+#'
+#' - `base_dir`: The directory in which to select files. If the path exists but
+#' doesn't point to a directory, this should be an error.
+#' - `allow_not_found`: The behavior if `base_dir` doesn't exist in the
+#' filesystem. If `FALSE`, an error is returned. If `TRUE`, an empty
+#' selection is returned
+#' - `recursive`: Whether to recurse into subdirectories.
+#'
+#' @rdname FileSelector
+#' @export
+FileSelector <- R6Class("FileSelector",
+ inherit = ArrowObject,
+ active = list(
+ base_dir = function() fs___FileSelector__base_dir(self),
+ allow_not_found = function() fs___FileSelector__allow_not_found(self),
+ recursive = function() fs___FileSelector__recursive(self)
+ )
+)
+
+FileSelector$create <- function(base_dir, allow_not_found = FALSE, recursive = FALSE) {
+ fs___FileSelector__create(clean_path_rel(base_dir), allow_not_found, recursive)
+}
+
+#' @title FileSystem classes
+#' @description `FileSystem` is an abstract file system API,
+#' `LocalFileSystem` is an implementation accessing files
+#' on the local machine. `SubTreeFileSystem` is an implementation that delegates
+#' to another implementation after prepending a fixed base path
+#'
+#' @section Factory:
+#'
+#' `LocalFileSystem$create()` returns the object and takes no arguments.
+#'
+#' `SubTreeFileSystem$create()` takes the following arguments:
+#'
+#' - `base_path`, a string path
+#' - `base_fs`, a `FileSystem` object
+#'
+#' `S3FileSystem$create()` optionally takes arguments:
+#'
+#' - `anonymous`: logical, default `FALSE`. If true, will not attempt to look up
+#' credentials using standard AWS configuration methods.
+#' - `access_key`, `secret_key`: authentication credentials. If one is provided,
+#' the other must be as well. If both are provided, they will override any
+#' AWS configuration set at the environment level.
+#' - `session_token`: optional string for authentication along with
+#' `access_key` and `secret_key`
+#' - `role_arn`: string AWS ARN of an AccessRole. If provided instead of `access_key` and
+#' `secret_key`, temporary credentials will be fetched by assuming this role.
+#' - `session_name`: optional string identifier for the assumed role session.
+#' - `external_id`: optional unique string identifier that might be required
+#' when you assume a role in another account.
+#' - `load_frequency`: integer, frequency (in seconds) with which temporary
+#' credentials from an assumed role session will be refreshed. Default is
+#' 900 (i.e. 15 minutes)
+#' - `region`: AWS region to connect to. If omitted, the AWS library will
+#' provide a sensible default based on client configuration, falling back
+#' to "us-east-1" if no other alternatives are found.
+#' - `endpoint_override`: If non-empty, override region with a connect string
+#' such as "localhost:9000". This is useful for connecting to file systems
+#' that emulate S3.
+#' - `scheme`: S3 connection transport (default "https")
+#' - `background_writes`: logical, whether `OutputStream` writes will be issued
+#' in the background, without blocking (default `TRUE`)
+#'
+#' @section Methods:
+#'
+#' - `$GetFileInfo(x)`: `x` may be a [FileSelector][FileSelector] or a character
+#' vector of paths. Returns a list of [FileInfo][FileInfo]
+#' - `$CreateDir(path, recursive = TRUE)`: Create a directory and subdirectories.
+#' - `$DeleteDir(path)`: Delete a directory and its contents, recursively.
+#' - `$DeleteDirContents(path)`: Delete a directory's contents, recursively.
+#' Like `$DeleteDir()`,
+#' but doesn't delete the directory itself. Passing an empty path (`""`) will
+#' wipe the entire filesystem tree.
+#' - `$DeleteFile(path)` : Delete a file.
+#' - `$DeleteFiles(paths)` : Delete many files. The default implementation
+#' issues individual delete operations in sequence.
+#' - `$Move(src, dest)`: Move / rename a file or directory. If the destination
+#' exists:
+#' if it is a non-empty directory, an error is returned
+#' otherwise, if it has the same type as the source, it is replaced
+#' otherwise, behavior is unspecified (implementation-dependent).
+#' - `$CopyFile(src, dest)`: Copy a file. If the destination exists and is a
+#' directory, an error is returned. Otherwise, it is replaced.
+#' - `$OpenInputStream(path)`: Open an [input stream][InputStream] for
+#' sequential reading.
+#' - `$OpenInputFile(path)`: Open an [input file][RandomAccessFile] for random
+#' access reading.
+#' - `$OpenOutputStream(path)`: Open an [output stream][OutputStream] for
+#' sequential writing.
+#' - `$OpenAppendStream(path)`: Open an [output stream][OutputStream] for
+#' appending.
+#'
+#' @section Active bindings:
+#'
+#' - `$type_name`: string filesystem type name, such as "local", "s3", etc.
+#' - `$region`: string AWS region, for `S3FileSystem` and `SubTreeFileSystem`
+#' containing a `S3FileSystem`
+#' - `$base_fs`: for `SubTreeFileSystem`, the `FileSystem` it contains
+#' - `$base_path`: for `SubTreeFileSystem`, the path in `$base_fs` which is considered
+#' root in this `SubTreeFileSystem`.
+#'
+#' @usage NULL
+#' @format NULL
+#' @docType class
+#'
+#' @rdname FileSystem
+#' @name FileSystem
+#' @export
+FileSystem <- R6Class("FileSystem",
+ inherit = ArrowObject,
+ public = list(
+ GetFileInfo = function(x) {
+ if (inherits(x, "FileSelector")) {
+ fs___FileSystem__GetTargetInfos_FileSelector(self, x)
+ } else if (is.character(x)) {
+ fs___FileSystem__GetTargetInfos_Paths(self, clean_path_rel(x))
+ } else {
+ abort("incompatible type for FileSystem$GetFileInfo()")
+ }
+ },
+ CreateDir = function(path, recursive = TRUE) {
+ fs___FileSystem__CreateDir(self, clean_path_rel(path), isTRUE(recursive))
+ },
+ DeleteDir = function(path) {
+ fs___FileSystem__DeleteDir(self, clean_path_rel(path))
+ },
+ DeleteDirContents = function(path) {
+ fs___FileSystem__DeleteDirContents(self, clean_path_rel(path))
+ },
+ DeleteFile = function(path) {
+ fs___FileSystem__DeleteFile(self, clean_path_rel(path))
+ },
+ DeleteFiles = function(paths) {
+ fs___FileSystem__DeleteFiles(self, clean_path_rel(paths))
+ },
+ Move = function(src, dest) {
+ fs___FileSystem__Move(self, clean_path_rel(src), clean_path_rel(dest))
+ },
+ CopyFile = function(src, dest) {
+ fs___FileSystem__CopyFile(self, clean_path_rel(src), clean_path_rel(dest))
+ },
+ OpenInputStream = function(path) {
+ fs___FileSystem__OpenInputStream(self, clean_path_rel(path))
+ },
+ OpenInputFile = function(path) {
+ fs___FileSystem__OpenInputFile(self, clean_path_rel(path))
+ },
+ OpenOutputStream = function(path) {
+ fs___FileSystem__OpenOutputStream(self, clean_path_rel(path))
+ },
+ OpenAppendStream = function(path) {
+ fs___FileSystem__OpenAppendStream(self, clean_path_rel(path))
+ },
+
+ # Friendlier R user interface
+ path = function(x) SubTreeFileSystem$create(x, self),
+ cd = function(x) SubTreeFileSystem$create(x, self),
+ ls = function(path = "", ...) {
+ selector <- FileSelector$create(path, ...) # ... for recursive = TRUE
+ infos <- self$GetFileInfo(selector)
+ map_chr(infos, ~ .$path)
+ # TODO: add full.names argument like base::dir() (default right now is TRUE)
+ # TODO: see fs package for glob/regexp filtering
+ # TODO: verbose method that shows other attributes as df
+ # TODO: print methods for FileInfo, SubTreeFileSystem, S3FileSystem
+ }
+ ),
+ active = list(
+ type_name = function() fs___FileSystem__type_name(self)
+ )
+)
+FileSystem$from_uri <- function(uri) {
+ assert_that(is.string(uri))
+ fs___FileSystemFromUri(uri)
+}
+
+get_paths_and_filesystem <- function(x, filesystem = NULL) {
+ # Wrapper around FileSystem$from_uri that handles local paths
+ # and an optional explicit filesystem
+ if (inherits(x, "SubTreeFileSystem")) {
+ return(list(fs = x$base_fs, path = x$base_path))
+ }
+ assert_that(is.character(x))
+ are_urls <- are_urls(x)
+ if (any(are_urls)) {
+ if (!all(are_urls)) {
+ stop("Vectors of mixed paths and URIs are not supported", call. = FALSE)
+ }
+ if (!is.null(filesystem)) {
+ # Stop? Can't have URL (which yields a fs) and another fs
+ }
+ x <- lapply(x, FileSystem$from_uri)
+ if (length(unique(map(x, ~ class(.$fs)))) > 1) {
+ stop(
+ "Vectors of URIs for different file systems are not supported",
+ call. = FALSE
+ )
+ }
+ fs <- x[[1]]$fs
+ path <- map_chr(x, ~ .$path) # singular name "path" used for compatibility
+ } else {
+ fs <- filesystem %||% LocalFileSystem$create()
+ if (inherits(fs, "LocalFileSystem")) {
+ path <- clean_path_abs(x)
+ } else {
+ path <- clean_path_rel(x)
+ }
+ }
+ list(
+ fs = fs,
+ path = path
+ )
+}
+
+# variant of the above function that asserts that x is either a scalar string
+# or a SubTreeFileSystem
+get_path_and_filesystem <- function(x, filesystem = NULL) {
+ assert_that(is.string(x) || inherits(x, "SubTreeFileSystem"))
+ get_paths_and_filesystem(x, filesystem)
+}
+
+is_url <- function(x) is.string(x) && grepl("://", x)
+are_urls <- function(x) if (!is.character(x)) FALSE else grepl("://", x)
+
+#' @usage NULL
+#' @format NULL
+#' @rdname FileSystem
+#' @export
+LocalFileSystem <- R6Class("LocalFileSystem", inherit = FileSystem)
+LocalFileSystem$create <- function() {
+ fs___LocalFileSystem__create()
+}
+
+#' @usage NULL
+#' @format NULL
+#' @rdname FileSystem
+#' @importFrom utils modifyList
+#' @export
+S3FileSystem <- R6Class("S3FileSystem",
+ inherit = FileSystem,
+ active = list(
+ region = function() fs___S3FileSystem__region(self)
+ )
+)
+S3FileSystem$create <- function(anonymous = FALSE, ...) {
+ args <- list2(...)
+ if (anonymous) {
+ invalid_args <- intersect(
+ c(
+ "access_key", "secret_key", "session_token", "role_arn", "session_name",
+ "external_id", "load_frequency"
+ ),
+ names(args)
+ )
+ if (length(invalid_args)) {
+ stop("Cannot specify ", oxford_paste(invalid_args), " when anonymous = TRUE", call. = FALSE)
+ }
+ } else {
+ keys_present <- length(intersect(c("access_key", "secret_key"), names(args)))
+ if (keys_present == 1) {
+ stop("Key authentication requires both access_key and secret_key", call. = FALSE)
+ }
+ if ("session_token" %in% names(args) && keys_present != 2) {
+ stop(
+ "In order to initialize a session with temporary credentials, ",
+ "both secret_key and access_key must be provided ",
+ "in addition to session_token.",
+ call. = FALSE
+ )
+ }
+ arn <- "role_arn" %in% names(args)
+ if (keys_present == 2 && arn) {
+ stop("Cannot provide both key authentication and role_arn", call. = FALSE)
+ }
+ arn_extras <- intersect(c("session_name", "external_id", "load_frequency"), names(args))
+ if (length(arn_extras) > 0 && !arn) {
+ stop("Cannot specify ", oxford_paste(arn_extras), " without providing a role_arn string", call. = FALSE)
+ }
+ }
+ args <- c(modifyList(default_s3_options, args), anonymous = anonymous)
+ exec(fs___S3FileSystem__create, !!!args)
+}
+
+default_s3_options <- list(
+ access_key = "",
+ secret_key = "",
+ session_token = "",
+ role_arn = "",
+ session_name = "",
+ external_id = "",
+ load_frequency = 900L,
+ region = "",
+ endpoint_override = "",
+ scheme = "",
+ background_writes = TRUE
+)
+
+#' Connect to an AWS S3 bucket
+#'
+#' `s3_bucket()` is a convenience function to create an `S3FileSystem` object
+#' that automatically detects the bucket's AWS region and holding onto the its
+#' relative path.
+#'
+#' @param bucket string S3 bucket name or path
+#' @param ... Additional connection options, passed to `S3FileSystem$create()`
+#' @return A `SubTreeFileSystem` containing an `S3FileSystem` and the bucket's
+#' relative path. Note that this function's success does not guarantee that you
+#' are authorized to access the bucket's contents.
+#' @examplesIf arrow_with_s3()
+#' bucket <- s3_bucket("ursa-labs-taxi-data")
+#' @export
+s3_bucket <- function(bucket, ...) {
+ assert_that(is.string(bucket))
+ args <- list2(...)
+
+ # Use FileSystemFromUri to detect the bucket's region
+ if (!is_url(bucket)) {
+ bucket <- paste0("s3://", bucket)
+ }
+ fs_and_path <- FileSystem$from_uri(bucket)
+ fs <- fs_and_path$fs
+ # If there are no additional S3Options, we can use that filesystem
+ # Otherwise, take the region that was detected and make a new fs with the args
+ if (length(args)) {
+ args$region <- fs$region
+ fs <- exec(S3FileSystem$create, !!!args)
+ }
+ # Return a subtree pointing at that bucket path
+ SubTreeFileSystem$create(fs_and_path$path, fs)
+}
+
+#' @usage NULL
+#' @format NULL
+#' @rdname FileSystem
+#' @export
+SubTreeFileSystem <- R6Class("SubTreeFileSystem",
+ inherit = FileSystem,
+ public = list(
+ print = function(...) {
+ if (inherits(self$base_fs, "LocalFileSystem")) {
+ cat("SubTreeFileSystem: ", "file://", self$base_path, "\n", sep = "")
+ } else if (inherits(self$base_fs, "S3FileSystem")) {
+ cat("SubTreeFileSystem: ", "s3://", self$base_path, "\n", sep = "")
+ } else {
+ cat("SubTreeFileSystem", "\n", sep = "")
+ }
+ invisible(self)
+ }
+ ),
+ active = list(
+ base_fs = function() {
+ fs___SubTreeFileSystem__base_fs(self)
+ },
+ base_path = function() fs___SubTreeFileSystem__base_path(self)
+ )
+)
+SubTreeFileSystem$create <- function(base_path, base_fs = NULL) {
+ fs_and_path <- get_path_and_filesystem(base_path, base_fs)
+ fs___SubTreeFileSystem__create(fs_and_path$path, fs_and_path$fs)
+}
+
+#' @export
+`$.SubTreeFileSystem` <- function(x, name, ...) {
+ # This is to allow delegating methods/properties to the base_fs
+ assert_that(is.string(name))
+ if (name %in% ls(envir = x)) {
+ get(name, x)
+ } else if (name %in% ls(envir = x$base_fs)) {
+ get(name, x$base_fs)
+ } else {
+ NULL
+ }
+}
+
+#' Copy files between FileSystems
+#'
+#' @param from A string path to a local directory or file, a URI, or a
+#' `SubTreeFileSystem`. Files will be copied recursively from this path.
+#' @param to A string path to a local directory or file, a URI, or a
+#' `SubTreeFileSystem`. Directories will be created as necessary
+#' @param chunk_size The maximum size of block to read before flushing
+#' to the destination file. A larger chunk_size will use more memory while
+#' copying but may help accommodate high latency FileSystems.
+#' @return Nothing: called for side effects in the file system
+#' @export
+#' @examplesIf FALSE
+#' # Copy an S3 bucket's files to a local directory:
+#' copy_files("s3://your-bucket-name", "local-directory")
+#' # Using a FileSystem object
+#' copy_files(s3_bucket("your-bucket-name"), "local-directory")
+#' # Or go the other way, from local to S3
+#' copy_files("local-directory", s3_bucket("your-bucket-name"))
+copy_files <- function(from, to, chunk_size = 1024L * 1024L) {
+ from <- get_path_and_filesystem(from)
+ to <- get_path_and_filesystem(to)
+ invisible(fs___CopyFiles(
+ from$fs,
+ FileSelector$create(from$path, recursive = TRUE),
+ to$fs,
+ to$path,
+ chunk_size,
+ option_use_threads()
+ ))
+}
+
+clean_path_abs <- function(path) {
+ # Make sure we have a valid, absolute, forward-slashed path for passing to Arrow
+ normalizePath(path, winslash = "/", mustWork = FALSE)
+}
+
+clean_path_rel <- function(path) {
+ # Make sure all path separators are "/", not "\" as on Windows
+ path_sep <- ifelse(tolower(Sys.info()[["sysname"]]) == "windows", "\\\\", "/")
+ gsub(path_sep, "/", path)
+}