diff options
Diffstat (limited to 'src/arrow/r/R/duckdb.R')
-rw-r--r-- | src/arrow/r/R/duckdb.R | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/src/arrow/r/R/duckdb.R b/src/arrow/r/R/duckdb.R new file mode 100644 index 000000000..c772d4fbd --- /dev/null +++ b/src/arrow/r/R/duckdb.R @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' Create a (virtual) DuckDB table from an Arrow object +#' +#' This will do the necessary configuration to create a (virtual) table in DuckDB +#' that is backed by the Arrow object given. No data is copied or modified until +#' `collect()` or `compute()` are called or a query is run against the table. +#' +#' The result is a dbplyr-compatible object that can be used in d(b)plyr pipelines. +#' +#' If `auto_disconnect = TRUE`, the DuckDB table that is created will be configured +#' to be unregistered when the `tbl` object is garbage collected. This is helpful +#' if you don't want to have extra table objects in DuckDB after you've finished +#' using them. Currently, this cleanup can, however, sometimes lead to hangs if +#' tables are created and deleted in quick succession, hence the default value +#' of `FALSE` +#' +#' @param .data the Arrow object (e.g. Dataset, Table) to use for the DuckDB table +#' @param con a DuckDB connection to use (default will create one and store it +#' in `options("arrow_duck_con")`) +#' @param table_name a name to use in DuckDB for this object. The default is a +#' unique string `"arrow_"` followed by numbers. +#' @param auto_disconnect should the table be automatically cleaned up when the +#' resulting object is removed (and garbage collected)? Default: `FALSE` +#' +#' @return A `tbl` of the new table in DuckDB +#' +#' @name to_duckdb +#' @export +#' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")() +#' library(dplyr) +#' +#' ds <- InMemoryDataset$create(mtcars) +#' +#' ds %>% +#' filter(mpg < 30) %>% +#' to_duckdb() %>% +#' group_by(cyl) %>% +#' summarize(mean_mpg = mean(mpg, na.rm = TRUE)) +to_duckdb <- function(.data, + con = arrow_duck_connection(), + table_name = unique_arrow_tablename(), + auto_disconnect = FALSE) { + .data <- as_adq(.data) + duckdb::duckdb_register_arrow(con, table_name, .data) + + tbl <- tbl(con, table_name) + groups <- dplyr::groups(.data) + if (length(groups)) { + tbl <- dplyr::group_by(tbl, groups) + } + + if (auto_disconnect) { + # this will add the correct connection disconnection when the tbl is gced. + # we should probably confirm that this use of src$disco is kosher. + tbl$src$disco <- duckdb_disconnector(con, table_name) + } + + tbl +} + +arrow_duck_connection <- function() { + con <- getOption("arrow_duck_con") + if (is.null(con) || !DBI::dbIsValid(con)) { + con <- DBI::dbConnect(duckdb::duckdb()) + # Use the same CPU count that the arrow library is set to + DBI::dbExecute(con, paste0("PRAGMA threads=", cpu_count())) + options(arrow_duck_con = con) + } + con +} + +# helper function to determine if duckdb examples should run +# see: https://github.com/r-lib/roxygen2/issues/1242 +run_duckdb_examples <- function() { + arrow_with_dataset() && + requireNamespace("duckdb", quietly = TRUE) && + packageVersion("duckdb") > "0.2.7" && + requireNamespace("dplyr", quietly = TRUE) && + requireNamespace("dbplyr", quietly = TRUE) +} + +# Adapted from dbplyr +unique_arrow_tablename <- function() { + i <- getOption("arrow_table_name", 0) + 1 + options(arrow_table_name = i) + sprintf("arrow_%03i", i) +} + +# Creates an environment that disconnects the database when it's GC'd +duckdb_disconnector <- function(con, tbl_name) { + reg.finalizer(environment(), function(...) { + # remote the table we ephemerally created (though only if the connection is + # still valid) + if (DBI::dbIsValid(con)) { + duckdb::duckdb_unregister_arrow(con, tbl_name) + } + + # and there are no more tables, so we can safely shutdown + if (length(DBI::dbListTables(con)) == 0) { + DBI::dbDisconnect(con, shutdown = TRUE) + } + }) + environment() +} + +#' Create an Arrow object from others +#' +#' This can be used in pipelines that pass data back and forth between Arrow and +#' other processes (like DuckDB). +#' +#' @param .data the object to be converted +#' +#' @return an `arrow_dplyr_query` object, to be used in dplyr pipelines. +#' @export +#' +#' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")() +#' library(dplyr) +#' +#' ds <- InMemoryDataset$create(mtcars) +#' +#' ds %>% +#' filter(mpg < 30) %>% +#' to_duckdb() %>% +#' group_by(cyl) %>% +#' summarize(mean_mpg = mean(mpg, na.rm = TRUE)) %>% +#' to_arrow() %>% +#' collect() +to_arrow <- function(.data) { + # If this is an Arrow object already, return quickly since we're already Arrow + if (inherits(.data, c("arrow_dplyr_query", "ArrowObject"))) { + return(.data) + } + + # For now, we only handle .data from duckdb, so check that it is that if we've + # gotten this far + if (!inherits(dbplyr::remote_con(.data), "duckdb_connection")) { + stop( + "to_arrow() currently only supports Arrow tables, Arrow datasets, ", + "Arrow queries, or dbplyr tbls from duckdb connections", + call. = FALSE + ) + } + + # Run the query + res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE) + + # TODO: we shouldn't need $read_table(), but we get segfaults when we do. + arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res)$read_table()) +} |