diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/r/R/filesystem.R | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/r/R/filesystem.R')
-rw-r--r-- | src/arrow/r/R/filesystem.R | 505 |
1 files changed, 505 insertions, 0 deletions
diff --git a/src/arrow/r/R/filesystem.R b/src/arrow/r/R/filesystem.R new file mode 100644 index 000000000..a09d0a51d --- /dev/null +++ b/src/arrow/r/R/filesystem.R @@ -0,0 +1,505 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' @include arrow-package.R +#' @title FileSystem entry info +#' @usage NULL +#' @format NULL +#' +#' @section Methods: +#' +#' - `base_name()` : The file base name (component after the last directory +#' separator). +#' - `extension()` : The file extension +#' +#' @section Active bindings: +#' +#' - `$type`: The file type +#' - `$path`: The full file path in the filesystem +#' - `$size`: The size in bytes, if available. Only regular files are +#' guaranteed to have a size. +#' - `$mtime`: The time of last modification, if available. +#' +#' @rdname FileInfo +#' @export +FileInfo <- R6Class("FileInfo", + inherit = ArrowObject, + public = list( + base_name = function() fs___FileInfo__base_name(self), + extension = function() fs___FileInfo__extension(self) + ), + active = list( + type = function(type) { + if (missing(type)) { + fs___FileInfo__type(self) + } else { + fs___FileInfo__set_type(self, type) + } + }, + path = function(path) { + if (missing(path)) { + fs___FileInfo__path(self) + } else { + invisible(fs___FileInfo__set_path(self)) + } + }, + size = function(size) { + if (missing(size)) { + fs___FileInfo__size(self) + } else { + invisible(fs___FileInfo__set_size(self, size)) + } + }, + mtime = function(time) { + if (missing(time)) { + fs___FileInfo__mtime(self) + } else { + if (!inherits(time, "POSIXct") && length(time) == 1L) { + abort("invalid time") + } + invisible(fs___FileInfo__set_mtime(self, time)) + } + } + ) +) + +#' @title file selector +#' @format NULL +#' +#' @section Factory: +#' +#' The `$create()` factory method instantiates a `FileSelector` given the 3 fields +#' described below. +#' +#' @section Fields: +#' +#' - `base_dir`: The directory in which to select files. If the path exists but +#' doesn't point to a directory, this should be an error. +#' - `allow_not_found`: The behavior if `base_dir` doesn't exist in the +#' filesystem. If `FALSE`, an error is returned. If `TRUE`, an empty +#' selection is returned +#' - `recursive`: Whether to recurse into subdirectories. +#' +#' @rdname FileSelector +#' @export +FileSelector <- R6Class("FileSelector", + inherit = ArrowObject, + active = list( + base_dir = function() fs___FileSelector__base_dir(self), + allow_not_found = function() fs___FileSelector__allow_not_found(self), + recursive = function() fs___FileSelector__recursive(self) + ) +) + +FileSelector$create <- function(base_dir, allow_not_found = FALSE, recursive = FALSE) { + fs___FileSelector__create(clean_path_rel(base_dir), allow_not_found, recursive) +} + +#' @title FileSystem classes +#' @description `FileSystem` is an abstract file system API, +#' `LocalFileSystem` is an implementation accessing files +#' on the local machine. `SubTreeFileSystem` is an implementation that delegates +#' to another implementation after prepending a fixed base path +#' +#' @section Factory: +#' +#' `LocalFileSystem$create()` returns the object and takes no arguments. +#' +#' `SubTreeFileSystem$create()` takes the following arguments: +#' +#' - `base_path`, a string path +#' - `base_fs`, a `FileSystem` object +#' +#' `S3FileSystem$create()` optionally takes arguments: +#' +#' - `anonymous`: logical, default `FALSE`. If true, will not attempt to look up +#' credentials using standard AWS configuration methods. +#' - `access_key`, `secret_key`: authentication credentials. If one is provided, +#' the other must be as well. If both are provided, they will override any +#' AWS configuration set at the environment level. +#' - `session_token`: optional string for authentication along with +#' `access_key` and `secret_key` +#' - `role_arn`: string AWS ARN of an AccessRole. If provided instead of `access_key` and +#' `secret_key`, temporary credentials will be fetched by assuming this role. +#' - `session_name`: optional string identifier for the assumed role session. +#' - `external_id`: optional unique string identifier that might be required +#' when you assume a role in another account. +#' - `load_frequency`: integer, frequency (in seconds) with which temporary +#' credentials from an assumed role session will be refreshed. Default is +#' 900 (i.e. 15 minutes) +#' - `region`: AWS region to connect to. If omitted, the AWS library will +#' provide a sensible default based on client configuration, falling back +#' to "us-east-1" if no other alternatives are found. +#' - `endpoint_override`: If non-empty, override region with a connect string +#' such as "localhost:9000". This is useful for connecting to file systems +#' that emulate S3. +#' - `scheme`: S3 connection transport (default "https") +#' - `background_writes`: logical, whether `OutputStream` writes will be issued +#' in the background, without blocking (default `TRUE`) +#' +#' @section Methods: +#' +#' - `$GetFileInfo(x)`: `x` may be a [FileSelector][FileSelector] or a character +#' vector of paths. Returns a list of [FileInfo][FileInfo] +#' - `$CreateDir(path, recursive = TRUE)`: Create a directory and subdirectories. +#' - `$DeleteDir(path)`: Delete a directory and its contents, recursively. +#' - `$DeleteDirContents(path)`: Delete a directory's contents, recursively. +#' Like `$DeleteDir()`, +#' but doesn't delete the directory itself. Passing an empty path (`""`) will +#' wipe the entire filesystem tree. +#' - `$DeleteFile(path)` : Delete a file. +#' - `$DeleteFiles(paths)` : Delete many files. The default implementation +#' issues individual delete operations in sequence. +#' - `$Move(src, dest)`: Move / rename a file or directory. If the destination +#' exists: +#' if it is a non-empty directory, an error is returned +#' otherwise, if it has the same type as the source, it is replaced +#' otherwise, behavior is unspecified (implementation-dependent). +#' - `$CopyFile(src, dest)`: Copy a file. If the destination exists and is a +#' directory, an error is returned. Otherwise, it is replaced. +#' - `$OpenInputStream(path)`: Open an [input stream][InputStream] for +#' sequential reading. +#' - `$OpenInputFile(path)`: Open an [input file][RandomAccessFile] for random +#' access reading. +#' - `$OpenOutputStream(path)`: Open an [output stream][OutputStream] for +#' sequential writing. +#' - `$OpenAppendStream(path)`: Open an [output stream][OutputStream] for +#' appending. +#' +#' @section Active bindings: +#' +#' - `$type_name`: string filesystem type name, such as "local", "s3", etc. +#' - `$region`: string AWS region, for `S3FileSystem` and `SubTreeFileSystem` +#' containing a `S3FileSystem` +#' - `$base_fs`: for `SubTreeFileSystem`, the `FileSystem` it contains +#' - `$base_path`: for `SubTreeFileSystem`, the path in `$base_fs` which is considered +#' root in this `SubTreeFileSystem`. +#' +#' @usage NULL +#' @format NULL +#' @docType class +#' +#' @rdname FileSystem +#' @name FileSystem +#' @export +FileSystem <- R6Class("FileSystem", + inherit = ArrowObject, + public = list( + GetFileInfo = function(x) { + if (inherits(x, "FileSelector")) { + fs___FileSystem__GetTargetInfos_FileSelector(self, x) + } else if (is.character(x)) { + fs___FileSystem__GetTargetInfos_Paths(self, clean_path_rel(x)) + } else { + abort("incompatible type for FileSystem$GetFileInfo()") + } + }, + CreateDir = function(path, recursive = TRUE) { + fs___FileSystem__CreateDir(self, clean_path_rel(path), isTRUE(recursive)) + }, + DeleteDir = function(path) { + fs___FileSystem__DeleteDir(self, clean_path_rel(path)) + }, + DeleteDirContents = function(path) { + fs___FileSystem__DeleteDirContents(self, clean_path_rel(path)) + }, + DeleteFile = function(path) { + fs___FileSystem__DeleteFile(self, clean_path_rel(path)) + }, + DeleteFiles = function(paths) { + fs___FileSystem__DeleteFiles(self, clean_path_rel(paths)) + }, + Move = function(src, dest) { + fs___FileSystem__Move(self, clean_path_rel(src), clean_path_rel(dest)) + }, + CopyFile = function(src, dest) { + fs___FileSystem__CopyFile(self, clean_path_rel(src), clean_path_rel(dest)) + }, + OpenInputStream = function(path) { + fs___FileSystem__OpenInputStream(self, clean_path_rel(path)) + }, + OpenInputFile = function(path) { + fs___FileSystem__OpenInputFile(self, clean_path_rel(path)) + }, + OpenOutputStream = function(path) { + fs___FileSystem__OpenOutputStream(self, clean_path_rel(path)) + }, + OpenAppendStream = function(path) { + fs___FileSystem__OpenAppendStream(self, clean_path_rel(path)) + }, + + # Friendlier R user interface + path = function(x) SubTreeFileSystem$create(x, self), + cd = function(x) SubTreeFileSystem$create(x, self), + ls = function(path = "", ...) { + selector <- FileSelector$create(path, ...) # ... for recursive = TRUE + infos <- self$GetFileInfo(selector) + map_chr(infos, ~ .$path) + # TODO: add full.names argument like base::dir() (default right now is TRUE) + # TODO: see fs package for glob/regexp filtering + # TODO: verbose method that shows other attributes as df + # TODO: print methods for FileInfo, SubTreeFileSystem, S3FileSystem + } + ), + active = list( + type_name = function() fs___FileSystem__type_name(self) + ) +) +FileSystem$from_uri <- function(uri) { + assert_that(is.string(uri)) + fs___FileSystemFromUri(uri) +} + +get_paths_and_filesystem <- function(x, filesystem = NULL) { + # Wrapper around FileSystem$from_uri that handles local paths + # and an optional explicit filesystem + if (inherits(x, "SubTreeFileSystem")) { + return(list(fs = x$base_fs, path = x$base_path)) + } + assert_that(is.character(x)) + are_urls <- are_urls(x) + if (any(are_urls)) { + if (!all(are_urls)) { + stop("Vectors of mixed paths and URIs are not supported", call. = FALSE) + } + if (!is.null(filesystem)) { + # Stop? Can't have URL (which yields a fs) and another fs + } + x <- lapply(x, FileSystem$from_uri) + if (length(unique(map(x, ~ class(.$fs)))) > 1) { + stop( + "Vectors of URIs for different file systems are not supported", + call. = FALSE + ) + } + fs <- x[[1]]$fs + path <- map_chr(x, ~ .$path) # singular name "path" used for compatibility + } else { + fs <- filesystem %||% LocalFileSystem$create() + if (inherits(fs, "LocalFileSystem")) { + path <- clean_path_abs(x) + } else { + path <- clean_path_rel(x) + } + } + list( + fs = fs, + path = path + ) +} + +# variant of the above function that asserts that x is either a scalar string +# or a SubTreeFileSystem +get_path_and_filesystem <- function(x, filesystem = NULL) { + assert_that(is.string(x) || inherits(x, "SubTreeFileSystem")) + get_paths_and_filesystem(x, filesystem) +} + +is_url <- function(x) is.string(x) && grepl("://", x) +are_urls <- function(x) if (!is.character(x)) FALSE else grepl("://", x) + +#' @usage NULL +#' @format NULL +#' @rdname FileSystem +#' @export +LocalFileSystem <- R6Class("LocalFileSystem", inherit = FileSystem) +LocalFileSystem$create <- function() { + fs___LocalFileSystem__create() +} + +#' @usage NULL +#' @format NULL +#' @rdname FileSystem +#' @importFrom utils modifyList +#' @export +S3FileSystem <- R6Class("S3FileSystem", + inherit = FileSystem, + active = list( + region = function() fs___S3FileSystem__region(self) + ) +) +S3FileSystem$create <- function(anonymous = FALSE, ...) { + args <- list2(...) + if (anonymous) { + invalid_args <- intersect( + c( + "access_key", "secret_key", "session_token", "role_arn", "session_name", + "external_id", "load_frequency" + ), + names(args) + ) + if (length(invalid_args)) { + stop("Cannot specify ", oxford_paste(invalid_args), " when anonymous = TRUE", call. = FALSE) + } + } else { + keys_present <- length(intersect(c("access_key", "secret_key"), names(args))) + if (keys_present == 1) { + stop("Key authentication requires both access_key and secret_key", call. = FALSE) + } + if ("session_token" %in% names(args) && keys_present != 2) { + stop( + "In order to initialize a session with temporary credentials, ", + "both secret_key and access_key must be provided ", + "in addition to session_token.", + call. = FALSE + ) + } + arn <- "role_arn" %in% names(args) + if (keys_present == 2 && arn) { + stop("Cannot provide both key authentication and role_arn", call. = FALSE) + } + arn_extras <- intersect(c("session_name", "external_id", "load_frequency"), names(args)) + if (length(arn_extras) > 0 && !arn) { + stop("Cannot specify ", oxford_paste(arn_extras), " without providing a role_arn string", call. = FALSE) + } + } + args <- c(modifyList(default_s3_options, args), anonymous = anonymous) + exec(fs___S3FileSystem__create, !!!args) +} + +default_s3_options <- list( + access_key = "", + secret_key = "", + session_token = "", + role_arn = "", + session_name = "", + external_id = "", + load_frequency = 900L, + region = "", + endpoint_override = "", + scheme = "", + background_writes = TRUE +) + +#' Connect to an AWS S3 bucket +#' +#' `s3_bucket()` is a convenience function to create an `S3FileSystem` object +#' that automatically detects the bucket's AWS region and holding onto the its +#' relative path. +#' +#' @param bucket string S3 bucket name or path +#' @param ... Additional connection options, passed to `S3FileSystem$create()` +#' @return A `SubTreeFileSystem` containing an `S3FileSystem` and the bucket's +#' relative path. Note that this function's success does not guarantee that you +#' are authorized to access the bucket's contents. +#' @examplesIf arrow_with_s3() +#' bucket <- s3_bucket("ursa-labs-taxi-data") +#' @export +s3_bucket <- function(bucket, ...) { + assert_that(is.string(bucket)) + args <- list2(...) + + # Use FileSystemFromUri to detect the bucket's region + if (!is_url(bucket)) { + bucket <- paste0("s3://", bucket) + } + fs_and_path <- FileSystem$from_uri(bucket) + fs <- fs_and_path$fs + # If there are no additional S3Options, we can use that filesystem + # Otherwise, take the region that was detected and make a new fs with the args + if (length(args)) { + args$region <- fs$region + fs <- exec(S3FileSystem$create, !!!args) + } + # Return a subtree pointing at that bucket path + SubTreeFileSystem$create(fs_and_path$path, fs) +} + +#' @usage NULL +#' @format NULL +#' @rdname FileSystem +#' @export +SubTreeFileSystem <- R6Class("SubTreeFileSystem", + inherit = FileSystem, + public = list( + print = function(...) { + if (inherits(self$base_fs, "LocalFileSystem")) { + cat("SubTreeFileSystem: ", "file://", self$base_path, "\n", sep = "") + } else if (inherits(self$base_fs, "S3FileSystem")) { + cat("SubTreeFileSystem: ", "s3://", self$base_path, "\n", sep = "") + } else { + cat("SubTreeFileSystem", "\n", sep = "") + } + invisible(self) + } + ), + active = list( + base_fs = function() { + fs___SubTreeFileSystem__base_fs(self) + }, + base_path = function() fs___SubTreeFileSystem__base_path(self) + ) +) +SubTreeFileSystem$create <- function(base_path, base_fs = NULL) { + fs_and_path <- get_path_and_filesystem(base_path, base_fs) + fs___SubTreeFileSystem__create(fs_and_path$path, fs_and_path$fs) +} + +#' @export +`$.SubTreeFileSystem` <- function(x, name, ...) { + # This is to allow delegating methods/properties to the base_fs + assert_that(is.string(name)) + if (name %in% ls(envir = x)) { + get(name, x) + } else if (name %in% ls(envir = x$base_fs)) { + get(name, x$base_fs) + } else { + NULL + } +} + +#' Copy files between FileSystems +#' +#' @param from A string path to a local directory or file, a URI, or a +#' `SubTreeFileSystem`. Files will be copied recursively from this path. +#' @param to A string path to a local directory or file, a URI, or a +#' `SubTreeFileSystem`. Directories will be created as necessary +#' @param chunk_size The maximum size of block to read before flushing +#' to the destination file. A larger chunk_size will use more memory while +#' copying but may help accommodate high latency FileSystems. +#' @return Nothing: called for side effects in the file system +#' @export +#' @examplesIf FALSE +#' # Copy an S3 bucket's files to a local directory: +#' copy_files("s3://your-bucket-name", "local-directory") +#' # Using a FileSystem object +#' copy_files(s3_bucket("your-bucket-name"), "local-directory") +#' # Or go the other way, from local to S3 +#' copy_files("local-directory", s3_bucket("your-bucket-name")) +copy_files <- function(from, to, chunk_size = 1024L * 1024L) { + from <- get_path_and_filesystem(from) + to <- get_path_and_filesystem(to) + invisible(fs___CopyFiles( + from$fs, + FileSelector$create(from$path, recursive = TRUE), + to$fs, + to$path, + chunk_size, + option_use_threads() + )) +} + +clean_path_abs <- function(path) { + # Make sure we have a valid, absolute, forward-slashed path for passing to Arrow + normalizePath(path, winslash = "/", mustWork = FALSE) +} + +clean_path_rel <- function(path) { + # Make sure all path separators are "/", not "\" as on Windows + path_sep <- ifelse(tolower(Sys.info()[["sysname"]]) == "windows", "\\\\", "/") + gsub(path_sep, "/", path) +} |