diff options
Diffstat (limited to 'src/arrow/r/tests/testthat/test-dataset-dplyr.R')
-rw-r--r-- | src/arrow/r/tests/testthat/test-dataset-dplyr.R | 340 |
1 files changed, 340 insertions, 0 deletions
diff --git a/src/arrow/r/tests/testthat/test-dataset-dplyr.R b/src/arrow/r/tests/testthat/test-dataset-dplyr.R new file mode 100644 index 000000000..b4519377c --- /dev/null +++ b/src/arrow/r/tests/testthat/test-dataset-dplyr.R @@ -0,0 +1,340 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +skip_if_not_available("dataset") +skip_if_not_available("parquet") + +library(dplyr, warn.conflicts = FALSE) + +dataset_dir <- make_temp_dir() +hive_dir <- make_temp_dir() + +test_that("Setup (putting data in the dir)", { + dir.create(file.path(dataset_dir, 1)) + dir.create(file.path(dataset_dir, 2)) + write_parquet(df1, file.path(dataset_dir, 1, "file1.parquet")) + write_parquet(df2, file.path(dataset_dir, 2, "file2.parquet")) + expect_length(dir(dataset_dir, recursive = TRUE), 2) + + dir.create(file.path(hive_dir, "subdir", "group=1", "other=xxx"), recursive = TRUE) + dir.create(file.path(hive_dir, "subdir", "group=2", "other=yyy"), recursive = TRUE) + write_parquet(df1, file.path(hive_dir, "subdir", "group=1", "other=xxx", "file1.parquet")) + write_parquet(df2, file.path(hive_dir, "subdir", "group=2", "other=yyy", "file2.parquet")) + expect_length(dir(hive_dir, recursive = TRUE), 2) +}) + +test_that("filter() with is.nan()", { + ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8())) + expect_equal( + ds %>% + select(part, dbl) %>% + filter(!is.nan(dbl), part == 2) %>% + collect(), + tibble(part = 2L, dbl = df2$dbl[!is.nan(df2$dbl)]) + ) +}) + +test_that("filter() with %in%", { + ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8())) + expect_equal( + ds %>% + select(int, part) %>% + filter(int %in% c(6, 4, 3, 103, 107), part == 1) %>% + collect(), + tibble(int = df1$int[c(3, 4, 6)], part = 1) + ) + + # ARROW-9606: bug in %in% filter on partition column with >1 partition columns + ds <- open_dataset(hive_dir) + expect_equal( + ds %>% + filter(group %in% 2) %>% + select(names(df2)) %>% + collect(), + df2 + ) +}) + +test_that("filter() on timestamp columns", { + ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8())) + expect_equal( + ds %>% + filter(ts >= lubridate::ymd_hms("2015-05-04 03:12:39")) %>% + filter(part == 1) %>% + select(ts) %>% + collect(), + df1[5:10, c("ts")], + ) + + # Now with Date + expect_equal( + ds %>% + filter(ts >= as.Date("2015-05-04")) %>% + filter(part == 1) %>% + select(ts) %>% + collect(), + df1[5:10, c("ts")], + ) + + # Now with bare string date + skip("Implement more aggressive implicit casting for scalars (ARROW-11402)") + expect_equal( + ds %>% + filter(ts >= "2015-05-04") %>% + filter(part == 1) %>% + select(ts) %>% + collect(), + df1[5:10, c("ts")], + ) +}) + +test_that("filter() on date32 columns", { + tmp <- tempfile() + dir.create(tmp) + df <- data.frame(date = as.Date(c("2020-02-02", "2020-02-03"))) + write_parquet(df, file.path(tmp, "file.parquet")) + + expect_equal( + open_dataset(tmp) %>% + filter(date > as.Date("2020-02-02")) %>% + collect() %>% + nrow(), + 1L + ) + + # Also with timestamp scalar + expect_equal( + open_dataset(tmp) %>% + filter(date > lubridate::ymd_hms("2020-02-02 00:00:00")) %>% + collect() %>% + nrow(), + 1L + ) +}) + + +test_that("mutate()", { + ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8())) + mutated <- ds %>% + select(chr, dbl, int) %>% + filter(dbl * 2 > 14 & dbl - 50 < 3L) %>% + mutate(twice = int * 2) + expect_output( + print(mutated), + "FileSystemDataset (query) +chr: string +dbl: double +int: int32 +twice: double (multiply_checked(int, 2)) + +* Filter: ((multiply_checked(dbl, 2) > 14) and (subtract_checked(dbl, 50) < 3)) +See $.data for the source Arrow object", + fixed = TRUE + ) + expect_equal( + mutated %>% + collect() %>% + arrange(dbl), + rbind( + df1[8:10, c("chr", "dbl", "int")], + df2[1:2, c("chr", "dbl", "int")] + ) %>% + mutate( + twice = int * 2 + ) + ) +}) + +test_that("mutate() features not yet implemented", { + expect_error( + ds %>% + group_by(int) %>% + mutate(avg = mean(int)), + "window functions not currently supported in Arrow\nCall collect() first to pull data into R.", + fixed = TRUE + ) +}) + +test_that("filter scalar validation doesn't crash (ARROW-7772)", { + expect_error( + ds %>% + filter(int == "fff", part == 1) %>% + collect(), + "equal has no kernel matching input types .array.int32., scalar.string.." + ) +}) + +test_that("collect() on Dataset works (if fits in memory)", { + expect_equal( + collect(open_dataset(dataset_dir)) %>% arrange(int), + rbind(df1, df2) + ) +}) + +test_that("count()", { + ds <- open_dataset(dataset_dir) + df <- rbind(df1, df2) + expect_equal( + ds %>% + filter(int > 6, int < 108) %>% + count(chr) %>% + arrange(chr) %>% + collect(), + df %>% + filter(int > 6, int < 108) %>% + count(chr) + ) +}) + +test_that("arrange()", { + ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8())) + arranged <- ds %>% + select(chr, dbl, int) %>% + filter(dbl * 2 > 14 & dbl - 50 < 3L) %>% + mutate(twice = int * 2) %>% + arrange(chr, desc(twice), dbl + int) + expect_output( + print(arranged), + "FileSystemDataset (query) +chr: string +dbl: double +int: int32 +twice: double (multiply_checked(int, 2)) + +* Filter: ((multiply_checked(dbl, 2) > 14) and (subtract_checked(dbl, 50) < 3)) +* Sorted by chr [asc], multiply_checked(int, 2) [desc], add_checked(dbl, int) [asc] +See $.data for the source Arrow object", + fixed = TRUE + ) + expect_equal( + arranged %>% + collect(), + rbind( + df1[8, c("chr", "dbl", "int")], + df2[2, c("chr", "dbl", "int")], + df1[9, c("chr", "dbl", "int")], + df2[1, c("chr", "dbl", "int")], + df1[10, c("chr", "dbl", "int")] + ) %>% + mutate( + twice = int * 2 + ) + ) +}) + +test_that("compute()/collect(as_data_frame=FALSE)", { + ds <- open_dataset(dataset_dir) + + tab1 <- ds %>% compute() + expect_r6_class(tab1, "Table") + + tab2 <- ds %>% collect(as_data_frame = FALSE) + expect_r6_class(tab2, "Table") + + tab3 <- ds %>% + mutate(negint = -int) %>% + filter(negint > -100) %>% + arrange(chr) %>% + select(negint) %>% + compute() + + expect_r6_class(tab3, "Table") + + expect_equal( + tab3 %>% collect(), + tibble(negint = -1:-10) + ) + + tab4 <- ds %>% + mutate(negint = -int) %>% + filter(negint > -100) %>% + arrange(chr) %>% + select(negint) %>% + collect(as_data_frame = FALSE) + + expect_r6_class(tab3, "Table") + + expect_equal( + tab4 %>% collect(), + tibble(negint = -1:-10) + ) + + tab5 <- ds %>% + mutate(negint = -int) %>% + group_by(fct) %>% + compute() + + # the group_by() prevents compute() from returning a Table... + expect_s3_class(tab5, "arrow_dplyr_query") + + # ... but $.data is a Table (InMemoryDataset)... + expect_r6_class(tab5$.data, "InMemoryDataset") + # ... and the mutate() was evaluated + expect_true("negint" %in% names(tab5$.data)) +}) + +test_that("head/tail on query on dataset", { + # head/tail on arrow_dplyr_query does not have deterministic order, + # so without sorting we can only assert the correct number of rows + ds <- open_dataset(dataset_dir) + + expect_identical( + ds %>% + filter(int > 6) %>% + head(5) %>% + compute() %>% + nrow(), + 5L + ) + + expect_equal( + ds %>% + filter(int > 6) %>% + arrange(int) %>% + head() %>% + collect(), + rbind(df1[7:10, ], df2[1:2, ]) + ) + + expect_equal( + ds %>% + filter(int < 105) %>% + tail(4) %>% + compute() %>% + nrow(), + 4L + ) + + expect_equal( + ds %>% + filter(int < 105) %>% + arrange(int) %>% + tail() %>% + collect(), + rbind(df1[9:10, ], df2[1:4, ]) + ) +}) + +test_that("dplyr method not implemented messages", { + ds <- open_dataset(dataset_dir) + # This one is more nuanced + expect_error( + ds %>% filter(int > 6, dbl > max(dbl)), + "Filter expression not supported for Arrow Datasets: dbl > max(dbl)\nCall collect() first to pull data into R.", + fixed = TRUE + ) +}) |