diff options
Diffstat (limited to '')
-rw-r--r-- | src/arrow/r/vignettes/dataset.Rmd | 421 |
1 files changed, 421 insertions, 0 deletions
diff --git a/src/arrow/r/vignettes/dataset.Rmd b/src/arrow/r/vignettes/dataset.Rmd new file mode 100644 index 000000000..3f33cbae4 --- /dev/null +++ b/src/arrow/r/vignettes/dataset.Rmd @@ -0,0 +1,421 @@ +--- +title: "Working with Arrow Datasets and dplyr" +output: rmarkdown::html_vignette +vignette: > + %\VignetteIndexEntry{Working with Arrow Datasets and dplyr} + %\VignetteEngine{knitr::rmarkdown} + %\VignetteEncoding{UTF-8} +--- + +Apache Arrow lets you work efficiently with large, multi-file datasets. +The arrow R package provides a [dplyr](https://dplyr.tidyverse.org/) interface to Arrow Datasets, +and other tools for interactive exploration of Arrow data. + +This vignette introduces Datasets and shows how to use dplyr to analyze them. + +## Example: NYC taxi data + +The [New York City taxi trip record data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) +is widely used in big data exercises and competitions. +For demonstration purposes, we have hosted a Parquet-formatted version +of about ten years of the trip data in a public Amazon S3 bucket. + +The total file size is around 37 gigabytes, even in the efficient Parquet file +format. That's bigger than memory on most people's computers, so you can't just +read it all in and stack it into a single data frame. + +In Windows (for R > 3.6) and macOS binary packages, S3 support is included. +On Linux, when installing from source, S3 support is not enabled by default, +and it has additional system requirements. +See `vignette("install", package = "arrow")` for details. +To see if your arrow installation has S3 support, run: + +```{r} +arrow::arrow_with_s3() +``` + +Even with S3 support enabled, network speed will be a bottleneck unless your +machine is located in the same AWS region as the data. So, for this vignette, +we assume that the NYC taxi dataset has been downloaded locally in an "nyc-taxi" +directory. + +### Retrieving data from a public Amazon S3 bucket + +If your arrow build has S3 support, you can sync the data locally with: + +```{r, eval = FALSE} +arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi") +``` + +If your arrow build doesn't have S3 support, you can download the files +with some additional code: + +```{r, eval = FALSE} +bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com" +for (year in 2009:2019) { + if (year == 2019) { + # We only have through June 2019 there + months <- 1:6 + } else { + months <- 1:12 + } + for (month in sprintf("%02d", months)) { + dir.create(file.path("nyc-taxi", year, month), recursive = TRUE) + try(download.file( + paste(bucket, year, month, "data.parquet", sep = "/"), + file.path("nyc-taxi", year, month, "data.parquet"), + mode = "wb" + ), silent = TRUE) + } +} +``` + +Note that these download steps in the vignette are not executed: if you want to run +with live data, you'll have to do it yourself separately. +Given the size, if you're running this locally and don't have a fast connection, +feel free to grab only a year or two of data. + +If you don't have the taxi data downloaded, the vignette will still run and will +yield previously cached output for reference. To be explicit about which version +is running, let's check whether you're running with live data: + +```{r} +dir.exists("nyc-taxi") +``` + +## Opening the dataset + +Because dplyr is not necessary for many Arrow workflows, +it is an optional (`Suggests`) dependency. So, to work with Datasets, +you need to load both arrow and dplyr. + +```{r} +library(arrow, warn.conflicts = FALSE) +library(dplyr, warn.conflicts = FALSE) +``` + +The first step is to create a Dataset object, pointing at the directory of data. + +```{r, eval = file.exists("nyc-taxi")} +ds <- open_dataset("nyc-taxi", partitioning = c("year", "month")) +``` + +The file format for `open_dataset()` is controlled by the `format` parameter, +which has a default value of `"parquet"`. If you had a directory +of Arrow format files, you could instead specify `format = "arrow"` in the call. + +Other supported formats include: + +* `"feather"` or `"ipc"` (aliases for `"arrow"`, as Feather v2 is the Arrow file format) +* `"csv"` (comma-delimited files) and `"tsv"` (tab-delimited files) +* `"text"` (generic text-delimited files - use the `delimiter` argument to specify which to use) + +For text files, you can pass the following parsing options to `open_dataset()`: + +* `delim` +* `quote` +* `escape_double` +* `escape_backslash` +* `skip_empty_rows` + +For more information on the usage of these parameters, see `?read_delim_arrow()`. + +The `partitioning` argument lets you specify how the file paths provide information +about how the dataset is chunked into different files. The files in this example +have file paths like + +``` +2009/01/data.parquet +2009/02/data.parquet +... +``` + +By providing `c("year", "month")` to the `partitioning` argument, you're saying that the first +path segment gives the value for `year`, and the second segment is `month`. +Every row in `2009/01/data.parquet` has a value of 2009 for `year` +and 1 for `month`, even though those columns may not be present in the file. + +Indeed, when you look at the dataset, you can see that in addition to the columns present +in every file, there are also columns `year` and `month` even though they are not present in the files themselves. + +```{r, eval = file.exists("nyc-taxi")} +ds +``` +```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} +cat(" +FileSystemDataset with 125 Parquet files +vendor_id: string +pickup_at: timestamp[us] +dropoff_at: timestamp[us] +passenger_count: int8 +trip_distance: float +pickup_longitude: float +pickup_latitude: float +rate_code_id: null +store_and_fwd_flag: string +dropoff_longitude: float +dropoff_latitude: float +payment_type: string +fare_amount: float +extra: float +mta_tax: float +tip_amount: float +tolls_amount: float +total_amount: float +year: int32 +month: int32 + +See $metadata for additional Schema metadata +") +``` + +The other form of partitioning currently supported is [Hive](https://hive.apache.org/)-style, +in which the partition variable names are included in the path segments. +If you had saved your files in paths like: + +``` +year=2009/month=01/data.parquet +year=2009/month=02/data.parquet +... +``` + +you would not have had to provide the names in `partitioning`; +you could have just called `ds <- open_dataset("nyc-taxi")` and the partitions +would have been detected automatically. + +## Querying the dataset + +Up to this point, you haven't loaded any data. You've walked directories to find +files, you've parsed file paths to identify partitions, and you've read the +headers of the Parquet files to inspect their schemas so that you can make sure +they all are as expected. + +In the current release, arrow supports the dplyr verbs `mutate()`, +`transmute()`, `select()`, `rename()`, `relocate()`, `filter()`, and +`arrange()`. Aggregation is not yet supported, so before you call `summarise()` +or other verbs with aggregate functions, use `collect()` to pull the selected +subset of the data into an in-memory R data frame. + +Suppose you attempt to call unsupported dplyr verbs or unimplemented functions +in your query on an Arrow Dataset. In that case, the arrow package raises an error. However, +for dplyr queries on Arrow Table objects (which are already in memory), the +package automatically calls `collect()` before processing that dplyr verb. + +Here's an example: suppose that you are curious about tipping behavior among the +longest taxi rides. Let's find the median tip percentage for rides with +fares greater than $100 in 2015, broken down by the number of passengers: + +```{r, eval = file.exists("nyc-taxi")} +system.time(ds %>% + filter(total_amount > 100, year == 2015) %>% + select(tip_amount, total_amount, passenger_count) %>% + mutate(tip_pct = 100 * tip_amount / total_amount) %>% + group_by(passenger_count) %>% + collect() %>% + summarise( + median_tip_pct = median(tip_pct), + n = n() + ) %>% + print()) +``` + +```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} +cat(" +# A tibble: 10 x 3 + passenger_count median_tip_pct n + <int> <dbl> <int> + 1 0 9.84 380 + 2 1 16.7 143087 + 3 2 16.6 34418 + 4 3 14.4 8922 + 5 4 11.4 4771 + 6 5 16.7 5806 + 7 6 16.7 3338 + 8 7 16.7 11 + 9 8 16.7 32 +10 9 16.7 42 + + user system elapsed + 4.436 1.012 1.402 +") +``` + +You've just selected a subset out of a dataset with around 2 billion rows, computed +a new column, and aggregated it in under 2 seconds on a modern laptop. How does +this work? + +First, `mutate()`/`transmute()`, `select()`/`rename()`/`relocate()`, `filter()`, +`group_by()`, and `arrange()` record their actions but don't evaluate on the +data until you run `collect()`. + +```{r, eval = file.exists("nyc-taxi")} +ds %>% + filter(total_amount > 100, year == 2015) %>% + select(tip_amount, total_amount, passenger_count) %>% + mutate(tip_pct = 100 * tip_amount / total_amount) %>% + group_by(passenger_count) +``` + +```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} +cat(" +FileSystemDataset (query) +tip_amount: float +total_amount: float +passenger_count: int8 +tip_pct: expr + +* Filter: ((total_amount > 100) and (year == 2015)) +* Grouped by passenger_count +See $.data for the source Arrow object +") +``` + +This code returns an output instantly and shows the manipulations you've made, without +loading data from the files. Because the evaluation of these queries is deferred, +you can build up a query that selects down to a small subset without generating +intermediate datasets that would potentially be large. + +Second, all work is pushed down to the individual data files, +and depending on the file format, chunks of data within the files. As a result, +you can select a subset of data from a much larger dataset by collecting the +smaller slices from each file—you don't have to load the whole dataset in +memory to slice from it. + +Third, because of partitioning, you can ignore some files entirely. +In this example, by filtering `year == 2015`, all files corresponding to other years +are immediately excluded: you don't have to load them in order to find that no +rows match the filter. Relatedly, since Parquet files contain row groups with +statistics on the data within, there may be entire chunks of data you can +avoid scanning because they have no rows where `total_amount > 100`. + +## More dataset options + +There are a few ways you can control the Dataset creation to adapt to special use cases. + +### Work with files in a directory + +If you are working with a single file or a set of files that are not all in the +same directory, you can provide a file path or a vector of multiple file paths +to `open_dataset()`. This is useful if, for example, you have a single CSV file +that is too big to read into memory. You could pass the file path to +`open_dataset()`, use `group_by()` to partition the Dataset into manageable chunks, +then use `write_dataset()` to write each chunk to a separate Parquet file—all +without needing to read the full CSV file into R. + +### Explicitly declare column names and data types + +You can specify the `schema` argument to `open_dataset()` to declare the columns +and their data types. This is useful if you have data files that have different +storage schema (for example, a column could be `int32` in one and `int8` in +another) and you want to ensure that the resulting Dataset has a specific type. + +To be clear, it's not necessary to specify a schema, even in this example of +mixed integer types, because the Dataset constructor will reconcile differences +like these. The schema specification just lets you declare what you want the +result to be. + +### Explicitly declare partition format + +Similarly, you can provide a Schema in the `partitioning` argument of `open_dataset()` +in order to declare the types of the virtual columns that define the partitions. +This would be useful, in the taxi dataset example, if you wanted to keep +`month` as a string instead of an integer. + +### Work with multiple data sources + +Another feature of Datasets is that they can be composed of multiple data sources. +That is, you may have a directory of partitioned Parquet files in one location, +and in another directory, files that haven't been partitioned. +Or, you could point to an S3 bucket of Parquet data and a directory +of CSVs on the local file system and query them together as a single dataset. +To create a multi-source dataset, provide a list of datasets to `open_dataset()` +instead of a file path, or simply concatenate them like `big_dataset <- c(ds1, ds2)`. + +## Writing datasets + +As you can see, querying a large dataset can be made quite fast by storage in an +efficient binary columnar format like Parquet or Feather and partitioning based on +columns commonly used for filtering. However, data isn't always stored that way. +Sometimes you might start with one giant CSV. The first step in analyzing data +is cleaning is up and reshaping it into a more usable form. + +The `write_dataset()` function allows you to take a Dataset or another tabular +data object—an Arrow Table or RecordBatch, or an R data frame—and write +it to a different file format, partitioned into multiple files. + +Assume that you have a version of the NYC Taxi data as CSV: + +```r +ds <- open_dataset("nyc-taxi/csv/", format = "csv") +``` + +You can write it to a new location and translate the files to the Feather format +by calling `write_dataset()` on it: + +```r +write_dataset(ds, "nyc-taxi/feather", format = "feather") +``` + +Next, let's imagine that the `payment_type` column is something you often filter +on, so you want to partition the data by that variable. By doing so you ensure +that a filter like `payment_type == "Cash"` will touch only a subset of files +where `payment_type` is always `"Cash"`. + +One natural way to express the columns you want to partition on is to use the +`group_by()` method: + +```r +ds %>% + group_by(payment_type) %>% + write_dataset("nyc-taxi/feather", format = "feather") +``` + +This will write files to a directory tree that looks like this: + +```r +system("tree nyc-taxi/feather") +``` + +``` +## feather +## ├── payment_type=1 +## │ └── part-18.feather +## ├── payment_type=2 +## │ └── part-19.feather +## ... +## └── payment_type=UNK +## └── part-17.feather +## +## 18 directories, 23 files +``` + +Note that the directory names are `payment_type=Cash` and similar: +this is the Hive-style partitioning described above. This means that when +you call `open_dataset()` on this directory, you don't have to declare what the +partitions are because they can be read from the file paths. +(To instead write bare values for partition segments, i.e. `Cash` rather than +`payment_type=Cash`, call `write_dataset()` with `hive_style = FALSE`.) + +Perhaps, though, `payment_type == "Cash"` is the only data you ever care about, +and you just want to drop the rest and have a smaller working set. +For this, you can `filter()` them out when writing: + +```r +ds %>% + filter(payment_type == "Cash") %>% + write_dataset("nyc-taxi/feather", format = "feather") +``` + +The other thing you can do when writing datasets is select a subset of columns +or reorder them. Suppose you never care about `vendor_id`, and being a string column, +it can take up a lot of space when you read it in, so let's drop it: + +```r +ds %>% + group_by(payment_type) %>% + select(-vendor_id) %>% + write_dataset("nyc-taxi/feather", format = "feather") +``` + +Note that while you can select a subset of columns, +you cannot currently rename columns when writing a dataset. |