summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/tests/testthat/test-s3-minio.R
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/arrow/r/tests/testthat/test-s3-minio.R228
1 files changed, 228 insertions, 0 deletions
diff --git a/src/arrow/r/tests/testthat/test-s3-minio.R b/src/arrow/r/tests/testthat/test-s3-minio.R
new file mode 100644
index 000000000..e2c1dc2e7
--- /dev/null
+++ b/src/arrow/r/tests/testthat/test-s3-minio.R
@@ -0,0 +1,228 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+if (arrow_with_s3() && process_is_running("minio server")) {
+ # Get minio config, with expected defaults
+ minio_key <- Sys.getenv("MINIO_ACCESS_KEY", "minioadmin")
+ minio_secret <- Sys.getenv("MINIO_SECRET_KEY", "minioadmin")
+ minio_port <- Sys.getenv("MINIO_PORT", "9000")
+
+ # Helper function for minio URIs
+ minio_uri <- function(...) {
+ template <- "s3://%s:%s@%s?scheme=http&endpoint_override=localhost%s%s"
+ sprintf(template, minio_key, minio_secret, minio_path(...), "%3A", minio_port)
+ }
+ minio_path <- function(...) paste(now, ..., sep = "/")
+
+ test_that("minio setup", {
+ # Create a "bucket" on minio for this test run, which we'll delete when done.
+ fs <- S3FileSystem$create(
+ access_key = minio_key,
+ secret_key = minio_secret,
+ scheme = "http",
+ endpoint_override = paste0("localhost:", minio_port)
+ )
+ expect_r6_class(fs, "S3FileSystem")
+ now <- as.character(as.numeric(Sys.time()))
+ # If minio isn't running, this will hang for a few seconds and fail with a
+ # curl timeout, causing `run_these` to be set to FALSE and skipping the tests
+ fs$CreateDir(now)
+ })
+ # Clean up when we're all done
+ on.exit(fs$DeleteDir(now))
+
+ test_that("read/write Feather on minio", {
+ write_feather(example_data, minio_uri("test.feather"))
+ expect_identical(read_feather(minio_uri("test.feather")), example_data)
+ })
+
+ test_that("read/write Feather by filesystem, not URI", {
+ write_feather(example_data, fs$path(minio_path("test2.feather")))
+ expect_identical(
+ read_feather(fs$path(minio_path("test2.feather"))),
+ example_data
+ )
+ })
+
+ test_that("read/write stream", {
+ write_ipc_stream(example_data, fs$path(minio_path("test3.ipc")))
+ expect_identical(
+ read_ipc_stream(fs$path(minio_path("test3.ipc"))),
+ example_data
+ )
+ })
+
+ test_that("read/write Parquet on minio", {
+ skip_if_not_available("parquet")
+ write_parquet(example_data, fs$path(minio_uri("test.parquet")))
+ expect_identical(read_parquet(minio_uri("test.parquet")), example_data)
+ })
+
+ if (arrow_with_dataset()) {
+ library(dplyr)
+
+ make_temp_dir <- function() {
+ path <- tempfile()
+ dir.create(path)
+ normalizePath(path, winslash = "/")
+ }
+
+ test_that("open_dataset with an S3 file (not directory) URI", {
+ skip_if_not_available("parquet")
+ expect_identical(
+ open_dataset(minio_uri("test.parquet")) %>% collect() %>% arrange(int),
+ example_data %>% arrange(int)
+ )
+ })
+
+ test_that("open_dataset with vector of S3 file URIs", {
+ expect_identical(
+ open_dataset(
+ c(minio_uri("test.feather"), minio_uri("test2.feather")),
+ format = "feather"
+ ) %>%
+ arrange(int) %>%
+ collect(),
+ rbind(example_data, example_data) %>% arrange(int)
+ )
+ })
+
+ test_that("open_dataset errors on URIs for different file systems", {
+ td <- make_temp_dir()
+ expect_error(
+ open_dataset(
+ c(
+ minio_uri("test.feather"),
+ paste0("file://", file.path(td, "fake.feather"))
+ ),
+ format = "feather"
+ ),
+ "Vectors of URIs for different file systems are not supported"
+ )
+ })
+
+ # Dataset test setup, cf. test-dataset.R
+ first_date <- lubridate::ymd_hms("2015-04-29 03:12:39")
+ df1 <- tibble(
+ int = 1:10,
+ dbl = as.numeric(1:10),
+ lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
+ chr = letters[1:10],
+ fct = factor(LETTERS[1:10]),
+ ts = first_date + lubridate::days(1:10)
+ )
+
+ second_date <- lubridate::ymd_hms("2017-03-09 07:01:02")
+ df2 <- tibble(
+ int = 101:110,
+ dbl = as.numeric(51:60),
+ lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
+ chr = letters[10:1],
+ fct = factor(LETTERS[10:1]),
+ ts = second_date + lubridate::days(10:1)
+ )
+
+ # This is also to set up the dataset tests
+ test_that("write_parquet with filesystem arg", {
+ skip_if_not_available("parquet")
+ fs$CreateDir(minio_path("hive_dir", "group=1", "other=xxx"))
+ fs$CreateDir(minio_path("hive_dir", "group=2", "other=yyy"))
+ expect_length(fs$ls(minio_path("hive_dir")), 2)
+ write_parquet(df1, fs$path(minio_path("hive_dir", "group=1", "other=xxx", "file1.parquet")))
+ write_parquet(df2, fs$path(minio_path("hive_dir", "group=2", "other=yyy", "file2.parquet")))
+ expect_identical(
+ read_parquet(fs$path(minio_path("hive_dir", "group=1", "other=xxx", "file1.parquet"))),
+ df1
+ )
+ })
+
+ test_that("open_dataset with fs", {
+ ds <- open_dataset(fs$path(minio_path("hive_dir")))
+ expect_identical(
+ ds %>% select(int, dbl, lgl) %>% collect() %>% arrange(int),
+ rbind(df1[, c("int", "dbl", "lgl")], df2[, c("int", "dbl", "lgl")]) %>% arrange(int)
+ )
+ })
+
+ test_that("write_dataset with fs", {
+ ds <- open_dataset(fs$path(minio_path("hive_dir")))
+ write_dataset(ds, fs$path(minio_path("new_dataset_dir")))
+ expect_length(fs$ls(minio_path("new_dataset_dir")), 1)
+ })
+
+ test_that("Let's test copy_files too", {
+ td <- make_temp_dir()
+ copy_files(minio_uri("hive_dir"), td)
+ expect_length(dir(td), 2)
+ ds <- open_dataset(td)
+ expect_identical(
+ ds %>% select(int, dbl, lgl) %>% collect() %>% arrange(int),
+ rbind(df1[, c("int", "dbl", "lgl")], df2[, c("int", "dbl", "lgl")]) %>% arrange(int)
+ )
+
+ # Let's copy the other way and use a SubTreeFileSystem rather than URI
+ copy_files(td, fs$path(minio_path("hive_dir2")))
+ ds2 <- open_dataset(fs$path(minio_path("hive_dir2")))
+ expect_identical(
+ ds2 %>% select(int, dbl, lgl) %>% collect() %>% arrange(int),
+ rbind(df1[, c("int", "dbl", "lgl")], df2[, c("int", "dbl", "lgl")]) %>% arrange(int)
+ )
+ })
+ }
+
+ test_that("S3FileSystem input validation", {
+ expect_error(
+ S3FileSystem$create(access_key = "foo"),
+ "Key authentication requires both access_key and secret_key"
+ )
+ expect_error(
+ S3FileSystem$create(secret_key = "foo"),
+ "Key authentication requires both access_key and secret_key"
+ )
+ expect_error(
+ S3FileSystem$create(session_token = "foo"),
+ paste0(
+ "In order to initialize a session with temporary credentials, ",
+ "both secret_key and access_key must be provided ",
+ "in addition to session_token."
+ )
+ )
+ expect_error(
+ S3FileSystem$create(access_key = "foo", secret_key = "asdf", anonymous = TRUE),
+ 'Cannot specify "access_key" and "secret_key" when anonymous = TRUE'
+ )
+ expect_error(
+ S3FileSystem$create(access_key = "foo", secret_key = "asdf", role_arn = "qwer"),
+ "Cannot provide both key authentication and role_arn"
+ )
+ expect_error(
+ S3FileSystem$create(access_key = "foo", secret_key = "asdf", external_id = "qwer"),
+ 'Cannot specify "external_id" without providing a role_arn string'
+ )
+ expect_error(
+ S3FileSystem$create(external_id = "foo"),
+ 'Cannot specify "external_id" without providing a role_arn string'
+ )
+ })
+} else {
+ # Kinda hacky, let's put a skipped test here, just so we note that the tests
+ # didn't run
+ test_that("S3FileSystem tests with Minio", {
+ skip("Minio is not running")
+ })
+}