diff options
Diffstat (limited to '')
-rw-r--r-- | src/arrow/r/src/filesystem.cpp | 329 |
1 files changed, 329 insertions, 0 deletions
diff --git a/src/arrow/r/src/filesystem.cpp b/src/arrow/r/src/filesystem.cpp new file mode 100644 index 000000000..b7ec60536 --- /dev/null +++ b/src/arrow/r/src/filesystem.cpp @@ -0,0 +1,329 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "./arrow_types.h" + +#if defined(ARROW_R_WITH_ARROW) + +#include <arrow/filesystem/filesystem.h> +#include <arrow/filesystem/localfs.h> + +namespace fs = ::arrow::fs; +namespace io = ::arrow::io; + +namespace cpp11 { + +const char* r6_class_name<fs::FileSystem>::get( + const std::shared_ptr<fs::FileSystem>& file_system) { + auto type_name = file_system->type_name(); + + if (type_name == "local") { + return "LocalFileSystem"; + } else if (type_name == "s3") { + return "S3FileSystem"; + } else if (type_name == "subtree") { + return "SubTreeFileSystem"; + } else { + return "FileSystem"; + } +} + +} // namespace cpp11 + +// [[arrow::export]] +fs::FileType fs___FileInfo__type(const std::shared_ptr<fs::FileInfo>& x) { + return x->type(); +} + +// [[arrow::export]] +void fs___FileInfo__set_type(const std::shared_ptr<fs::FileInfo>& x, fs::FileType type) { + x->set_type(type); +} + +// [[arrow::export]] +std::string fs___FileInfo__path(const std::shared_ptr<fs::FileInfo>& x) { + return x->path(); +} + +// [[arrow::export]] +void fs___FileInfo__set_path(const std::shared_ptr<fs::FileInfo>& x, + const std::string& path) { + x->set_path(path); +} + +// [[arrow::export]] +int64_t fs___FileInfo__size(const std::shared_ptr<fs::FileInfo>& x) { return x->size(); } + +// [[arrow::export]] +void fs___FileInfo__set_size(const std::shared_ptr<fs::FileInfo>& x, int64_t size) { + x->set_size(size); +} + +// [[arrow::export]] +std::string fs___FileInfo__base_name(const std::shared_ptr<fs::FileInfo>& x) { + return x->base_name(); +} + +// [[arrow::export]] +std::string fs___FileInfo__extension(const std::shared_ptr<fs::FileInfo>& x) { + return x->extension(); +} + +// [[arrow::export]] +SEXP fs___FileInfo__mtime(const std::shared_ptr<fs::FileInfo>& x) { + SEXP res = PROTECT(Rf_allocVector(REALSXP, 1)); + // .mtime() gets us nanoseconds since epoch, POSIXct is seconds since epoch as a double + REAL(res)[0] = static_cast<double>(x->mtime().time_since_epoch().count()) / 1000000000; + Rf_classgets(res, arrow::r::data::classes_POSIXct); + UNPROTECT(1); + return res; +} + +// [[arrow::export]] +void fs___FileInfo__set_mtime(const std::shared_ptr<fs::FileInfo>& x, SEXP time) { + auto nanosecs = + std::chrono::nanoseconds(static_cast<int64_t>(REAL(time)[0] * 1000000000)); + x->set_mtime(fs::TimePoint(nanosecs)); +} + +// Selector + +// [[arrow::export]] +std::string fs___FileSelector__base_dir( + const std::shared_ptr<fs::FileSelector>& selector) { + return selector->base_dir; +} + +// [[arrow::export]] +bool fs___FileSelector__allow_not_found( + const std::shared_ptr<fs::FileSelector>& selector) { + return selector->allow_not_found; +} + +// [[arrow::export]] +bool fs___FileSelector__recursive(const std::shared_ptr<fs::FileSelector>& selector) { + return selector->recursive; +} + +// [[arrow::export]] +std::shared_ptr<fs::FileSelector> fs___FileSelector__create(const std::string& base_dir, + bool allow_not_found, + bool recursive) { + auto selector = std::make_shared<fs::FileSelector>(); + selector->base_dir = base_dir; + selector->allow_not_found = allow_not_found; + selector->recursive = recursive; + return selector; +} + +// FileSystem + +template <typename T> +std::vector<std::shared_ptr<T>> shared_ptr_vector(const std::vector<T>& vec) { + std::vector<std::shared_ptr<fs::FileInfo>> res(vec.size()); + std::transform(vec.begin(), vec.end(), res.begin(), + [](const fs::FileInfo& x) { return std::make_shared<fs::FileInfo>(x); }); + return res; +} + +// [[arrow::export]] +cpp11::list fs___FileSystem__GetTargetInfos_Paths( + const std::shared_ptr<fs::FileSystem>& file_system, + const std::vector<std::string>& paths) { + auto results = ValueOrStop(file_system->GetFileInfo(paths)); + return arrow::r::to_r_list(shared_ptr_vector(results)); +} + +// [[arrow::export]] +cpp11::list fs___FileSystem__GetTargetInfos_FileSelector( + const std::shared_ptr<fs::FileSystem>& file_system, + const std::shared_ptr<fs::FileSelector>& selector) { + auto results = ValueOrStop(file_system->GetFileInfo(*selector)); + + return arrow::r::to_r_list(shared_ptr_vector(results)); +} + +// [[arrow::export]] +void fs___FileSystem__CreateDir(const std::shared_ptr<fs::FileSystem>& file_system, + const std::string& path, bool recursive) { + StopIfNotOk(file_system->CreateDir(path, recursive)); +} + +// [[arrow::export]] +void fs___FileSystem__DeleteDir(const std::shared_ptr<fs::FileSystem>& file_system, + const std::string& path) { + StopIfNotOk(file_system->DeleteDir(path)); +} + +// [[arrow::export]] +void fs___FileSystem__DeleteDirContents( + const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) { + StopIfNotOk(file_system->DeleteDirContents(path)); +} + +// [[arrow::export]] +void fs___FileSystem__DeleteFile(const std::shared_ptr<fs::FileSystem>& file_system, + const std::string& path) { + StopIfNotOk(file_system->DeleteFile(path)); +} + +// [[arrow::export]] +void fs___FileSystem__DeleteFiles(const std::shared_ptr<fs::FileSystem>& file_system, + const std::vector<std::string>& paths) { + StopIfNotOk(file_system->DeleteFiles(paths)); +} + +// [[arrow::export]] +void fs___FileSystem__Move(const std::shared_ptr<fs::FileSystem>& file_system, + const std::string& src, const std::string& dest) { + StopIfNotOk(file_system->Move(src, dest)); +} + +// [[arrow::export]] +void fs___FileSystem__CopyFile(const std::shared_ptr<fs::FileSystem>& file_system, + const std::string& src, const std::string& dest) { + StopIfNotOk(file_system->CopyFile(src, dest)); +} + +// [[arrow::export]] +std::shared_ptr<arrow::io::InputStream> fs___FileSystem__OpenInputStream( + const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) { + return ValueOrStop(file_system->OpenInputStream(path)); +} + +// [[arrow::export]] +std::shared_ptr<arrow::io::RandomAccessFile> fs___FileSystem__OpenInputFile( + const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) { + return ValueOrStop(file_system->OpenInputFile(path)); +} + +// [[arrow::export]] +std::shared_ptr<arrow::io::OutputStream> fs___FileSystem__OpenOutputStream( + const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) { + return ValueOrStop(file_system->OpenOutputStream(path)); +} + +// [[arrow::export]] +std::shared_ptr<arrow::io::OutputStream> fs___FileSystem__OpenAppendStream( + const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) { + return ValueOrStop(file_system->OpenAppendStream(path)); +} + +// [[arrow::export]] +std::string fs___FileSystem__type_name( + const std::shared_ptr<fs::FileSystem>& file_system) { + return file_system->type_name(); +} + +// [[arrow::export]] +std::shared_ptr<fs::LocalFileSystem> fs___LocalFileSystem__create() { + // Affects OpenInputFile/OpenInputStream + auto io_context = arrow::io::IOContext(gc_memory_pool()); + return std::make_shared<fs::LocalFileSystem>(io_context); +} + +// [[arrow::export]] +std::shared_ptr<fs::SubTreeFileSystem> fs___SubTreeFileSystem__create( + const std::string& base_path, const std::shared_ptr<fs::FileSystem>& base_fs) { + return std::make_shared<fs::SubTreeFileSystem>(base_path, base_fs); +} + +// [[arrow::export]] +std::shared_ptr<fs::FileSystem> fs___SubTreeFileSystem__base_fs( + const std::shared_ptr<fs::SubTreeFileSystem>& file_system) { + return file_system->base_fs(); +} + +// [[arrow::export]] +std::string fs___SubTreeFileSystem__base_path( + const std::shared_ptr<fs::SubTreeFileSystem>& file_system) { + return file_system->base_path(); +} + +// [[arrow::export]] +cpp11::writable::list fs___FileSystemFromUri(const std::string& path) { + using cpp11::literals::operator"" _nm; + + std::string out_path; + return cpp11::writable::list( + {"fs"_nm = cpp11::to_r6(ValueOrStop(fs::FileSystemFromUri(path, &out_path))), + "path"_nm = out_path}); +} + +// [[arrow::export]] +void fs___CopyFiles(const std::shared_ptr<fs::FileSystem>& source_fs, + const std::shared_ptr<fs::FileSelector>& source_sel, + const std::shared_ptr<fs::FileSystem>& destination_fs, + const std::string& destination_base_dir, + int64_t chunk_size = 1024 * 1024, bool use_threads = true) { + StopIfNotOk(fs::CopyFiles(source_fs, *source_sel, destination_fs, destination_base_dir, + io::default_io_context(), chunk_size, use_threads)); +} + +#endif + +#if defined(ARROW_R_WITH_S3) + +#include <arrow/filesystem/s3fs.h> + +// [[s3::export]] +std::shared_ptr<fs::S3FileSystem> fs___S3FileSystem__create( + bool anonymous = false, std::string access_key = "", std::string secret_key = "", + std::string session_token = "", std::string role_arn = "", + std::string session_name = "", std::string external_id = "", int load_frequency = 900, + std::string region = "", std::string endpoint_override = "", std::string scheme = "", + bool background_writes = true) { + fs::S3Options s3_opts; + // Handle auth (anonymous, keys, default) + // (validation/internal coherence handled in R) + if (anonymous) { + s3_opts = fs::S3Options::Anonymous(); + } else if (access_key != "" && secret_key != "") { + s3_opts = fs::S3Options::FromAccessKey(access_key, secret_key, session_token); + } else if (role_arn != "") { + s3_opts = fs::S3Options::FromAssumeRole(role_arn, session_name, external_id, + load_frequency); + } else { + s3_opts = fs::S3Options::Defaults(); + } + + // Now handle the rest of the options + /// AWS region to connect to (default determined by AWS SDK) + if (region != "") { + s3_opts.region = region; + } + /// If non-empty, override region with a connect string such as "localhost:9000" + s3_opts.endpoint_override = endpoint_override; + /// S3 connection transport, default "https" + if (scheme != "") { + s3_opts.scheme = scheme; + } + /// Whether OutputStream writes will be issued in the background, without blocking + /// default true + s3_opts.background_writes = background_writes; + + StopIfNotOk(fs::EnsureS3Initialized()); + auto io_context = arrow::io::IOContext(gc_memory_pool()); + return ValueOrStop(fs::S3FileSystem::Make(s3_opts, io_context)); +} + +// [[s3::export]] +std::string fs___S3FileSystem__region(const std::shared_ptr<fs::S3FileSystem>& fs) { + return fs->region(); +} + +#endif |