summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/tests/testthat/test-dataset.R
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/arrow/r/tests/testthat/test-dataset.R696
1 files changed, 696 insertions, 0 deletions
diff --git a/src/arrow/r/tests/testthat/test-dataset.R b/src/arrow/r/tests/testthat/test-dataset.R
new file mode 100644
index 000000000..4403b479a
--- /dev/null
+++ b/src/arrow/r/tests/testthat/test-dataset.R
@@ -0,0 +1,696 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+skip_if_not_available("dataset")
+
+library(dplyr, warn.conflicts = FALSE)
+
+dataset_dir <- make_temp_dir()
+hive_dir <- make_temp_dir()
+ipc_dir <- make_temp_dir()
+
+test_that("Setup (putting data in the dir)", {
+ if (arrow_with_parquet()) {
+ dir.create(file.path(dataset_dir, 1))
+ dir.create(file.path(dataset_dir, 2))
+ write_parquet(df1, file.path(dataset_dir, 1, "file1.parquet"))
+ write_parquet(df2, file.path(dataset_dir, 2, "file2.parquet"))
+ expect_length(dir(dataset_dir, recursive = TRUE), 2)
+
+ dir.create(file.path(hive_dir, "subdir", "group=1", "other=xxx"), recursive = TRUE)
+ dir.create(file.path(hive_dir, "subdir", "group=2", "other=yyy"), recursive = TRUE)
+ write_parquet(df1, file.path(hive_dir, "subdir", "group=1", "other=xxx", "file1.parquet"))
+ write_parquet(df2, file.path(hive_dir, "subdir", "group=2", "other=yyy", "file2.parquet"))
+ expect_length(dir(hive_dir, recursive = TRUE), 2)
+ }
+
+ # Now, an IPC format dataset
+ dir.create(file.path(ipc_dir, 3))
+ dir.create(file.path(ipc_dir, 4))
+ write_feather(df1, file.path(ipc_dir, 3, "file1.arrow"))
+ write_feather(df2, file.path(ipc_dir, 4, "file2.arrow"))
+ expect_length(dir(ipc_dir, recursive = TRUE), 2)
+})
+
+test_that("IPC/Feather format data", {
+ ds <- open_dataset(ipc_dir, partitioning = "part", format = "feather")
+ expect_r6_class(ds$format, "IpcFileFormat")
+ expect_r6_class(ds$filesystem, "LocalFileSystem")
+ expect_identical(names(ds), c(names(df1), "part"))
+ expect_identical(dim(ds), c(20L, 7L))
+
+ expect_equal(
+ ds %>%
+ select(string = chr, integer = int, part) %>%
+ filter(integer > 6 & part == 3) %>%
+ collect() %>%
+ summarize(mean = mean(integer)),
+ df1 %>%
+ select(string = chr, integer = int) %>%
+ filter(integer > 6) %>%
+ summarize(mean = mean(integer))
+ )
+
+ # Collecting virtual partition column works
+ expect_equal(
+ ds %>% arrange(part) %>% pull(part),
+ c(rep(3, 10), rep(4, 10))
+ )
+})
+
+expect_scan_result <- function(ds, schm) {
+ sb <- ds$NewScan()
+ expect_r6_class(sb, "ScannerBuilder")
+ expect_equal(sb$schema, schm)
+
+ sb$Project(c("chr", "lgl"))
+ sb$Filter(Expression$field_ref("dbl") == 8)
+ scn <- sb$Finish()
+ expect_r6_class(scn, "Scanner")
+
+ tab <- scn$ToTable()
+ expect_r6_class(tab, "Table")
+
+ expect_equal(
+ as.data.frame(tab),
+ df1[8, c("chr", "lgl")]
+ )
+}
+
+test_that("URI-decoding with directory partitioning", {
+ root <- make_temp_dir()
+ fmt <- FileFormat$create("feather")
+ fs <- LocalFileSystem$create()
+ selector <- FileSelector$create(root, recursive = TRUE)
+ dir1 <- file.path(root, "2021-05-04 00%3A00%3A00", "%24")
+ dir.create(dir1, recursive = TRUE)
+ write_feather(df1, file.path(dir1, "data.feather"))
+
+ partitioning <- DirectoryPartitioning$create(
+ schema(date = timestamp(unit = "s"), string = utf8())
+ )
+ factory <- FileSystemDatasetFactory$create(
+ fs, selector, NULL, fmt,
+ partitioning = partitioning
+ )
+ schm <- factory$Inspect()
+ ds <- factory$Finish(schm)
+ expect_scan_result(ds, schm)
+
+ partitioning <- DirectoryPartitioning$create(
+ schema(date = timestamp(unit = "s"), string = utf8()),
+ segment_encoding = "none"
+ )
+ factory <- FileSystemDatasetFactory$create(
+ fs, selector, NULL, fmt,
+ partitioning = partitioning
+ )
+ schm <- factory$Inspect()
+ expect_error(factory$Finish(schm), "Invalid: error parsing")
+
+ partitioning_factory <- DirectoryPartitioningFactory$create(
+ c("date", "string")
+ )
+ factory <- FileSystemDatasetFactory$create(
+ fs, selector, NULL, fmt, partitioning_factory
+ )
+ schm <- factory$Inspect()
+ ds <- factory$Finish(schm)
+ # Can't directly inspect partition expressions, so do it implicitly via scan
+ expect_equal(
+ ds %>%
+ filter(date == "2021-05-04 00:00:00", string == "$") %>%
+ select(int) %>%
+ collect(),
+ df1 %>% select(int) %>% collect()
+ )
+
+ partitioning_factory <- DirectoryPartitioningFactory$create(
+ c("date", "string"),
+ segment_encoding = "none"
+ )
+ factory <- FileSystemDatasetFactory$create(
+ fs, selector, NULL, fmt, partitioning_factory
+ )
+ schm <- factory$Inspect()
+ ds <- factory$Finish(schm)
+ expect_equal(
+ ds %>%
+ filter(date == "2021-05-04 00%3A00%3A00", string == "%24") %>%
+ select(int) %>%
+ collect(),
+ df1 %>% select(int) %>% collect()
+ )
+})
+
+test_that("URI-decoding with hive partitioning", {
+ root <- make_temp_dir()
+ fmt <- FileFormat$create("feather")
+ fs <- LocalFileSystem$create()
+ selector <- FileSelector$create(root, recursive = TRUE)
+ dir1 <- file.path(root, "date=2021-05-04 00%3A00%3A00", "string=%24")
+ dir.create(dir1, recursive = TRUE)
+ write_feather(df1, file.path(dir1, "data.feather"))
+
+ partitioning <- hive_partition(
+ date = timestamp(unit = "s"), string = utf8()
+ )
+ factory <- FileSystemDatasetFactory$create(
+ fs, selector, NULL, fmt,
+ partitioning = partitioning
+ )
+ ds <- factory$Finish(schm)
+ expect_scan_result(ds, schm)
+
+ partitioning <- hive_partition(
+ date = timestamp(unit = "s"), string = utf8(), segment_encoding = "none"
+ )
+ factory <- FileSystemDatasetFactory$create(
+ fs, selector, NULL, fmt,
+ partitioning = partitioning
+ )
+ expect_error(factory$Finish(schm), "Invalid: error parsing")
+
+ partitioning_factory <- hive_partition()
+ factory <- FileSystemDatasetFactory$create(
+ fs, selector, NULL, fmt, partitioning_factory
+ )
+ schm <- factory$Inspect()
+ ds <- factory$Finish(schm)
+ # Can't directly inspect partition expressions, so do it implicitly via scan
+ expect_equal(
+ ds %>%
+ filter(date == "2021-05-04 00:00:00", string == "$") %>%
+ select(int) %>%
+ collect(),
+ df1 %>% select(int) %>% collect()
+ )
+
+ partitioning_factory <- hive_partition(segment_encoding = "none")
+ factory <- FileSystemDatasetFactory$create(
+ fs, selector, NULL, fmt, partitioning_factory
+ )
+ schm <- factory$Inspect()
+ ds <- factory$Finish(schm)
+ expect_equal(
+ ds %>%
+ filter(date == "2021-05-04 00%3A00%3A00", string == "%24") %>%
+ select(int) %>%
+ collect(),
+ df1 %>% select(int) %>% collect()
+ )
+})
+
+# Everything else below here is using parquet files
+skip_if_not_available("parquet")
+
+files <- c(
+ file.path(dataset_dir, 1, "file1.parquet", fsep = "/"),
+ file.path(dataset_dir, 2, "file2.parquet", fsep = "/")
+)
+
+test_that("Simple interface for datasets", {
+ ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
+ expect_r6_class(ds$format, "ParquetFileFormat")
+ expect_r6_class(ds$filesystem, "LocalFileSystem")
+ expect_r6_class(ds, "Dataset")
+ expect_equal(
+ ds %>%
+ select(chr, dbl) %>%
+ filter(dbl > 7 & dbl < 53L) %>% # Testing the auto-casting of scalars
+ collect() %>%
+ arrange(dbl),
+ rbind(
+ df1[8:10, c("chr", "dbl")],
+ df2[1:2, c("chr", "dbl")]
+ )
+ )
+
+ expect_equal(
+ ds %>%
+ select(string = chr, integer = int, part) %>%
+ filter(integer > 6 & part == 1) %>% # 6 not 6L to test autocasting
+ collect() %>%
+ summarize(mean = mean(integer)),
+ df1 %>%
+ select(string = chr, integer = int) %>%
+ filter(integer > 6) %>%
+ summarize(mean = mean(integer))
+ )
+
+ # Collecting virtual partition column works
+ expect_equal(
+ ds %>% arrange(part) %>% pull(part),
+ c(rep(1, 10), rep(2, 10))
+ )
+})
+
+test_that("dim method returns the correct number of rows and columns", {
+ ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
+ expect_identical(dim(ds), c(20L, 7L))
+})
+
+
+test_that("dim() correctly determine numbers of rows and columns on arrow_dplyr_query object", {
+ ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
+
+ expect_identical(
+ ds %>%
+ filter(chr == "a") %>%
+ dim(),
+ c(2L, 7L)
+ )
+ expect_equal(
+ ds %>%
+ select(chr, fct, int) %>%
+ dim(),
+ c(20L, 3L)
+ )
+ expect_identical(
+ ds %>%
+ select(chr, fct, int) %>%
+ filter(chr == "a") %>%
+ dim(),
+ c(2L, 3L)
+ )
+})
+
+test_that("Simple interface for datasets (custom ParquetFileFormat)", {
+ ds <- open_dataset(dataset_dir,
+ partitioning = schema(part = uint8()),
+ format = FileFormat$create("parquet", dict_columns = c("chr"))
+ )
+ expect_type_equal(ds$schema$GetFieldByName("chr")$type, dictionary())
+})
+
+test_that("Hive partitioning", {
+ ds <- open_dataset(hive_dir, partitioning = hive_partition(other = utf8(), group = uint8()))
+ expect_r6_class(ds, "Dataset")
+ expect_equal(
+ ds %>%
+ filter(group == 2) %>%
+ select(chr, dbl) %>%
+ filter(dbl > 7 & dbl < 53) %>%
+ collect() %>%
+ arrange(dbl),
+ df2[1:2, c("chr", "dbl")]
+ )
+})
+
+test_that("input validation", {
+ expect_error(
+ open_dataset(hive_dir, hive_partition(other = utf8(), group = uint8()))
+ )
+})
+
+test_that("Partitioning inference", {
+ # These are the same tests as above, just using the *PartitioningFactory
+ ds1 <- open_dataset(dataset_dir, partitioning = "part")
+ expect_identical(names(ds1), c(names(df1), "part"))
+ expect_equal(
+ ds1 %>%
+ select(string = chr, integer = int, part) %>%
+ filter(integer > 6 & part == 1) %>%
+ collect() %>%
+ summarize(mean = mean(integer)),
+ df1 %>%
+ select(string = chr, integer = int) %>%
+ filter(integer > 6) %>%
+ summarize(mean = mean(integer))
+ )
+
+ ds2 <- open_dataset(hive_dir)
+ expect_identical(names(ds2), c(names(df1), "group", "other"))
+ expect_equal(
+ ds2 %>%
+ filter(group == 2) %>%
+ select(chr, dbl) %>%
+ filter(dbl > 7 & dbl < 53) %>%
+ collect() %>%
+ arrange(dbl),
+ df2[1:2, c("chr", "dbl")]
+ )
+})
+
+test_that("Dataset with multiple file formats", {
+ skip("https://issues.apache.org/jira/browse/ARROW-7653")
+ ds <- open_dataset(list(
+ open_dataset(dataset_dir, format = "parquet", partitioning = "part"),
+ open_dataset(ipc_dir, format = "arrow", partitioning = "part")
+ ))
+ expect_identical(names(ds), c(names(df1), "part"))
+ expect_equal(
+ ds %>%
+ filter(int > 6 & part %in% c(1, 3)) %>%
+ select(string = chr, integer = int) %>%
+ collect(),
+ df1 %>%
+ select(string = chr, integer = int) %>%
+ filter(integer > 6) %>%
+ rbind(., .) # Stack it twice
+ )
+})
+
+test_that("Creating UnionDataset", {
+ ds1 <- open_dataset(file.path(dataset_dir, 1))
+ ds2 <- open_dataset(file.path(dataset_dir, 2))
+ union1 <- open_dataset(list(ds1, ds2))
+ expect_r6_class(union1, "UnionDataset")
+ expect_equal(
+ union1 %>%
+ select(chr, dbl) %>%
+ filter(dbl > 7 & dbl < 53L) %>% # Testing the auto-casting of scalars
+ collect() %>%
+ arrange(dbl),
+ rbind(
+ df1[8:10, c("chr", "dbl")],
+ df2[1:2, c("chr", "dbl")]
+ )
+ )
+
+ # Now with the c() method
+ union2 <- c(ds1, ds2)
+ expect_r6_class(union2, "UnionDataset")
+ expect_equal(
+ union2 %>%
+ select(chr, dbl) %>%
+ filter(dbl > 7 & dbl < 53L) %>% # Testing the auto-casting of scalars
+ collect() %>%
+ arrange(dbl),
+ rbind(
+ df1[8:10, c("chr", "dbl")],
+ df2[1:2, c("chr", "dbl")]
+ )
+ )
+
+ # Confirm c() method error handling
+ expect_error(c(ds1, 42), "character")
+})
+
+test_that("map_batches", {
+ skip("map_batches() is broken (ARROW-14029)")
+ ds <- open_dataset(dataset_dir, partitioning = "part")
+ expect_equal(
+ ds %>%
+ filter(int > 5) %>%
+ select(int, lgl) %>%
+ map_batches(~ summarize(., min_int = min(int))),
+ tibble(min_int = c(6L, 101L))
+ )
+})
+
+test_that("partitioning = NULL to ignore partition information (but why?)", {
+ ds <- open_dataset(hive_dir, partitioning = NULL)
+ expect_identical(names(ds), names(df1)) # i.e. not c(names(df1), "group", "other")
+})
+
+test_that("head/tail", {
+ # head/tail with no query are still deterministic order
+ ds <- open_dataset(dataset_dir)
+ expect_equal(as.data.frame(head(ds)), head(df1))
+ expect_equal(
+ as.data.frame(head(ds, 12)),
+ rbind(df1, df2[1:2, ])
+ )
+
+ expect_equal(as.data.frame(tail(ds)), tail(df2))
+ expect_equal(
+ as.data.frame(tail(ds, 12)),
+ rbind(df1[9:10, ], df2)
+ )
+})
+
+test_that("Dataset [ (take by index)", {
+ ds <- open_dataset(dataset_dir)
+ # Taking only from one file
+ expect_equal(
+ as.data.frame(ds[c(4, 5, 9), 3:4]),
+ df1[c(4, 5, 9), 3:4]
+ )
+ # Taking from more than one
+ expect_equal(
+ as.data.frame(ds[c(4, 5, 9, 12, 13), 3:4]),
+ rbind(df1[c(4, 5, 9), 3:4], df2[2:3, 3:4])
+ )
+ # Taking out of order
+ expect_equal(
+ as.data.frame(ds[c(4, 13, 9, 12, 5), ]),
+ rbind(
+ df1[4, ],
+ df2[3, ],
+ df1[9, ],
+ df2[2, ],
+ df1[5, ]
+ )
+ )
+
+ # Take from a query
+ ds2 <- ds %>%
+ filter(int > 6) %>%
+ select(int, lgl)
+ expect_equal(
+ as.data.frame(ds2[c(2, 5), ]),
+ rbind(
+ df1[8, c("int", "lgl")],
+ df2[1, c("int", "lgl")]
+ )
+ )
+})
+
+test_that("Dataset and query print methods", {
+ ds <- open_dataset(hive_dir)
+ expect_output(
+ print(ds),
+ paste(
+ "FileSystemDataset with 2 Parquet files",
+ "int: int32",
+ "dbl: double",
+ "lgl: bool",
+ "chr: string",
+ "fct: dictionary<values=string, indices=int32>",
+ "ts: timestamp[us, tz=UTC]",
+ "group: int32",
+ "other: string",
+ sep = "\n"
+ ),
+ fixed = TRUE
+ )
+ expect_type(ds$metadata, "list")
+ q <- select(ds, string = chr, lgl, integer = int)
+ expect_output(
+ print(q),
+ paste(
+ "Dataset (query)",
+ "string: string",
+ "lgl: bool",
+ "integer: int32",
+ "",
+ "See $.data for the source Arrow object",
+ sep = "\n"
+ ),
+ fixed = TRUE
+ )
+ expect_output(
+ print(q %>% filter(integer == 6) %>% group_by(lgl)),
+ paste(
+ "Dataset (query)",
+ "string: string",
+ "lgl: bool",
+ "integer: int32",
+ "",
+ "* Filter: (int == 6)",
+ "* Grouped by lgl",
+ "See $.data for the source Arrow object",
+ sep = "\n"
+ ),
+ fixed = TRUE
+ )
+})
+
+test_that("Scanner$ScanBatches", {
+ ds <- open_dataset(ipc_dir, format = "feather")
+ batches <- ds$NewScan()$Finish()$ScanBatches()
+ table <- Table$create(!!!batches)
+ expect_equal(as.data.frame(table), rbind(df1, df2))
+
+ batches <- ds$NewScan()$UseAsync(TRUE)$Finish()$ScanBatches()
+ table <- Table$create(!!!batches)
+ expect_equal(as.data.frame(table), rbind(df1, df2))
+})
+
+test_that("Scanner$ToRecordBatchReader()", {
+ ds <- open_dataset(dataset_dir, partitioning = "part")
+ scan <- ds %>%
+ filter(part == 1) %>%
+ select(int, lgl) %>%
+ filter(int > 6) %>%
+ Scanner$create()
+ reader <- scan$ToRecordBatchReader()
+ expect_r6_class(reader, "RecordBatchReader")
+ expect_identical(
+ as.data.frame(reader$read_table()),
+ df1[df1$int > 6, c("int", "lgl")]
+ )
+})
+
+test_that("Scanner$create() filter/projection pushdown", {
+ ds <- open_dataset(dataset_dir, partitioning = "part")
+
+ # the standard to compare all Scanner$create()s against
+ scan_one <- ds %>%
+ filter(int > 7 & dbl < 57) %>%
+ select(int, dbl, lgl) %>%
+ mutate(int_plus = int + 1, dbl_minus = dbl - 1) %>%
+ Scanner$create()
+
+ # select a column in projection
+ scan_two <- ds %>%
+ filter(int > 7 & dbl < 57) %>%
+ # select an extra column, since we are going to
+ select(int, dbl, lgl, chr) %>%
+ mutate(int_plus = int + 1, dbl_minus = dbl - 1) %>%
+ Scanner$create(projection = c("int", "dbl", "lgl", "int_plus", "dbl_minus"))
+ expect_identical(
+ as.data.frame(scan_one$ToRecordBatchReader()$read_table()),
+ as.data.frame(scan_two$ToRecordBatchReader()$read_table())
+ )
+
+ # adding filters to Scanner$create
+ scan_three <- ds %>%
+ filter(int > 7) %>%
+ select(int, dbl, lgl) %>%
+ mutate(int_plus = int + 1, dbl_minus = dbl - 1) %>%
+ Scanner$create(
+ filter = Expression$create("less", Expression$field_ref("dbl"), Expression$scalar(57))
+ )
+ expect_identical(
+ as.data.frame(scan_one$ToRecordBatchReader()$read_table()),
+ as.data.frame(scan_three$ToRecordBatchReader()$read_table())
+ )
+
+ expect_error(
+ ds %>%
+ select(int, dbl, lgl) %>%
+ Scanner$create(projection = "not_a_col"),
+ # Full message is "attempting to project with unknown columns" >= 4.0.0, but
+ # prior versions have a less nice "all(projection %in% names(proj)) is not TRUE"
+ "project"
+ )
+
+ expect_error(
+ ds %>%
+ select(int, dbl, lgl) %>%
+ Scanner$create(filter = list("foo", "bar")),
+ "filter expressions must be either an expression or a list of expressions"
+ )
+})
+
+test_that("Assembling a Dataset manually and getting a Table", {
+ fs <- LocalFileSystem$create()
+ selector <- FileSelector$create(dataset_dir, recursive = TRUE)
+ partitioning <- DirectoryPartitioning$create(schema(part = double()))
+
+ fmt <- FileFormat$create("parquet")
+ factory <- FileSystemDatasetFactory$create(fs, selector, NULL, fmt, partitioning = partitioning)
+ expect_r6_class(factory, "FileSystemDatasetFactory")
+
+ schm <- factory$Inspect()
+ expect_r6_class(schm, "Schema")
+
+ phys_schm <- ParquetFileReader$create(files[1])$GetSchema()
+ expect_equal(names(phys_schm), names(df1))
+ expect_equal(names(schm), c(names(phys_schm), "part"))
+
+ child <- factory$Finish(schm)
+ expect_r6_class(child, "FileSystemDataset")
+ expect_r6_class(child$schema, "Schema")
+ expect_r6_class(child$format, "ParquetFileFormat")
+ expect_equal(names(schm), names(child$schema))
+ expect_equal(child$files, files)
+
+ ds <- Dataset$create(list(child), schm)
+ expect_scan_result(ds, schm)
+})
+
+test_that("Assembling multiple DatasetFactories with DatasetFactory", {
+ factory1 <- dataset_factory(file.path(dataset_dir, 1), format = "parquet")
+ expect_r6_class(factory1, "FileSystemDatasetFactory")
+ factory2 <- dataset_factory(file.path(dataset_dir, 2), format = "parquet")
+ expect_r6_class(factory2, "FileSystemDatasetFactory")
+
+ factory <- DatasetFactory$create(list(factory1, factory2))
+ expect_r6_class(factory, "DatasetFactory")
+
+ schm <- factory$Inspect()
+ expect_r6_class(schm, "Schema")
+
+ phys_schm <- ParquetFileReader$create(files[1])$GetSchema()
+ expect_equal(names(phys_schm), names(df1))
+
+ ds <- factory$Finish(schm)
+ expect_r6_class(ds, "UnionDataset")
+ expect_r6_class(ds$schema, "Schema")
+ expect_equal(names(schm), names(ds$schema))
+ expect_equal(unlist(map(ds$children, ~ .$files)), files)
+
+ expect_scan_result(ds, schm)
+})
+
+# see https://issues.apache.org/jira/browse/ARROW-11328
+test_that("Collecting zero columns from a dataset doesn't return entire dataset", {
+ tmp <- tempfile()
+ write_dataset(mtcars, tmp, format = "parquet")
+ expect_equal(
+ open_dataset(tmp) %>% select() %>% collect() %>% dim(),
+ c(32, 0)
+ )
+})
+
+
+test_that("dataset RecordBatchReader to C-interface to arrow_dplyr_query", {
+ ds <- open_dataset(ipc_dir, partitioning = "part", format = "feather")
+
+ # export the RecordBatchReader via the C-interface
+ stream_ptr <- allocate_arrow_array_stream()
+ scan <- Scanner$create(ds)
+ reader <- scan$ToRecordBatchReader()
+ reader$export_to_c(stream_ptr)
+
+ # then import it and check that the roundtripped value is the same
+ circle <- RecordBatchStreamReader$import_from_c(stream_ptr)
+
+ # create an arrow_dplyr_query() from the recordbatch reader
+ reader_adq <- arrow_dplyr_query(circle)
+
+ # TODO: ARROW-14321 should be able to arrange then collect
+ tab_from_c_new <- reader_adq %>%
+ filter(int < 8, int > 55) %>%
+ mutate(part_plus = part + 6) %>%
+ collect()
+ expect_equal(
+ tab_from_c_new %>%
+ arrange(dbl),
+ ds %>%
+ filter(int < 8, int > 55) %>%
+ mutate(part_plus = part + 6) %>%
+ collect() %>%
+ arrange(dbl)
+ )
+
+ # must clean up the pointer or we leak
+ delete_arrow_array_stream(stream_ptr)
+})