summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/tests/testthat/test-record-batch-reader.R
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/arrow/r/tests/testthat/test-record-batch-reader.R141
1 files changed, 141 insertions, 0 deletions
diff --git a/src/arrow/r/tests/testthat/test-record-batch-reader.R b/src/arrow/r/tests/testthat/test-record-batch-reader.R
new file mode 100644
index 000000000..3992670dc
--- /dev/null
+++ b/src/arrow/r/tests/testthat/test-record-batch-reader.R
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+test_that("RecordBatchStreamReader / Writer", {
+ tbl <- tibble::tibble(
+ x = 1:10,
+ y = letters[1:10]
+ )
+ batch <- record_batch(tbl)
+ tab <- Table$create(tbl)
+
+ sink <- BufferOutputStream$create()
+ expect_equal(sink$tell(), 0)
+ writer <- RecordBatchStreamWriter$create(sink, batch$schema)
+ expect_r6_class(writer, "RecordBatchWriter")
+ writer$write(batch)
+ writer$write(tab)
+ writer$write(tbl)
+ expect_true(sink$tell() > 0)
+ writer$close()
+
+ buf <- sink$finish()
+ expect_r6_class(buf, "Buffer")
+
+ reader <- RecordBatchStreamReader$create(buf)
+ expect_r6_class(reader, "RecordBatchStreamReader")
+
+ batch1 <- reader$read_next_batch()
+ expect_r6_class(batch1, "RecordBatch")
+ expect_equal(batch, batch1)
+ batch2 <- reader$read_next_batch()
+ expect_r6_class(batch2, "RecordBatch")
+ expect_equal(batch, batch2)
+ batch3 <- reader$read_next_batch()
+ expect_r6_class(batch3, "RecordBatch")
+ expect_equal(batch, batch3)
+ expect_null(reader$read_next_batch())
+})
+
+test_that("RecordBatchFileReader / Writer", {
+ sink <- BufferOutputStream$create()
+ writer <- RecordBatchFileWriter$create(sink, batch$schema)
+ expect_r6_class(writer, "RecordBatchWriter")
+ writer$write(batch)
+ writer$write(tab)
+ writer$write(tbl)
+ writer$close()
+
+ buf <- sink$finish()
+ expect_r6_class(buf, "Buffer")
+
+ reader <- RecordBatchFileReader$create(buf)
+ expect_r6_class(reader, "RecordBatchFileReader")
+
+ batch1 <- reader$get_batch(0)
+ expect_r6_class(batch1, "RecordBatch")
+ expect_equal(batch, batch1)
+
+ expect_equal(reader$num_record_batches, 3)
+})
+
+test_that("StreamReader read_table", {
+ sink <- BufferOutputStream$create()
+ writer <- RecordBatchStreamWriter$create(sink, batch$schema)
+ expect_r6_class(writer, "RecordBatchWriter")
+ writer$write(batch)
+ writer$write(tab)
+ writer$write(tbl)
+ writer$close()
+ buf <- sink$finish()
+
+ reader <- RecordBatchStreamReader$create(buf)
+ out <- reader$read_table()
+ expect_identical(dim(out), c(30L, 2L))
+})
+
+test_that("FileReader read_table", {
+ sink <- BufferOutputStream$create()
+ writer <- RecordBatchFileWriter$create(sink, batch$schema)
+ expect_r6_class(writer, "RecordBatchWriter")
+ writer$write(batch)
+ writer$write(tab)
+ writer$write(tbl)
+ writer$close()
+ buf <- sink$finish()
+
+ reader <- RecordBatchFileReader$create(buf)
+ out <- reader$read_table()
+ expect_identical(dim(out), c(30L, 2L))
+})
+
+test_that("MetadataFormat", {
+ expect_identical(get_ipc_metadata_version(5), 4L)
+ expect_identical(get_ipc_metadata_version("V4"), 3L)
+ expect_identical(get_ipc_metadata_version(NULL), 4L)
+ Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = 1)
+ expect_identical(get_ipc_metadata_version(NULL), 3L)
+ Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = "")
+
+ expect_identical(get_ipc_metadata_version(NULL), 4L)
+ Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = 1)
+ expect_identical(get_ipc_metadata_version(NULL), 3L)
+ Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = "")
+
+ expect_error(
+ get_ipc_metadata_version(99),
+ "99 is not a valid IPC MetadataVersion"
+ )
+ expect_error(
+ get_ipc_metadata_version("45"),
+ '"45" is not a valid IPC MetadataVersion'
+ )
+})
+
+test_that("reader with 0 batches", {
+ # IPC stream containing only a schema (ARROW-10642)
+ sink <- BufferOutputStream$create()
+ writer <- RecordBatchStreamWriter$create(sink, schema(a = int32()))
+ writer$close()
+ buf <- sink$finish()
+
+ reader <- RecordBatchStreamReader$create(buf)
+ tab <- reader$read_table()
+ expect_r6_class(tab, "Table")
+ expect_identical(dim(tab), c(0L, 1L))
+})