// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "./arrow_types.h" #if defined(ARROW_R_WITH_ARROW) #include #include // [[arrow::export]] std::shared_ptr RecordBatchReader__schema( const std::shared_ptr& reader) { return reader->schema(); } // [[arrow::export]] std::shared_ptr RecordBatchReader__ReadNext( const std::shared_ptr& reader) { std::shared_ptr batch; StopIfNotOk(reader->ReadNext(&batch)); return batch; } // [[arrow::export]] cpp11::list RecordBatchReader__batches( const std::shared_ptr& reader) { std::vector> res; StopIfNotOk(reader->ReadAll(&res)); return arrow::r::to_r_list(res); } // [[arrow::export]] std::shared_ptr Table__from_RecordBatchReader( const std::shared_ptr& reader) { std::shared_ptr table = nullptr; StopIfNotOk(reader->ReadAll(&table)); return table; } // -------- RecordBatchStreamReader // [[arrow::export]] std::shared_ptr ipc___RecordBatchStreamReader__Open( const std::shared_ptr& stream) { auto options = arrow::ipc::IpcReadOptions::Defaults(); options.memory_pool = gc_memory_pool(); return ValueOrStop(arrow::ipc::RecordBatchStreamReader::Open(stream, options)); } // -------- RecordBatchFileReader // [[arrow::export]] std::shared_ptr ipc___RecordBatchFileReader__schema( const std::shared_ptr& reader) { return reader->schema(); } // [[arrow::export]] int ipc___RecordBatchFileReader__num_record_batches( const std::shared_ptr& reader) { return reader->num_record_batches(); } // [[arrow::export]] std::shared_ptr ipc___RecordBatchFileReader__ReadRecordBatch( const std::shared_ptr& reader, int i) { if (i < 0 && i >= reader->num_record_batches()) { cpp11::stop("Record batch index out of bounds"); } return ValueOrStop(reader->ReadRecordBatch(i)); } // [[arrow::export]] std::shared_ptr ipc___RecordBatchFileReader__Open( const std::shared_ptr& file) { auto options = arrow::ipc::IpcReadOptions::Defaults(); options.memory_pool = gc_memory_pool(); return ValueOrStop(arrow::ipc::RecordBatchFileReader::Open(file, options)); } // [[arrow::export]] std::shared_ptr Table__from_RecordBatchFileReader( const std::shared_ptr& reader) { // RecordBatchStreamReader inherits from RecordBatchReader // but RecordBatchFileReader apparently does not int num_batches = reader->num_record_batches(); std::vector> batches(num_batches); for (int i = 0; i < num_batches; i++) { batches[i] = ValueOrStop(reader->ReadRecordBatch(i)); } return ValueOrStop(arrow::Table::FromRecordBatches(std::move(batches))); } // [[arrow::export]] cpp11::list ipc___RecordBatchFileReader__batches( const std::shared_ptr& reader) { auto n = reader->num_record_batches(); std::vector> res(n); for (int i = 0; i < n; i++) { res[i] = ValueOrStop(reader->ReadRecordBatch(i)); } return arrow::r::to_r_list(res); } #endif