summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/vignettes/dataset.Rmd
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/arrow/r/vignettes/dataset.Rmd421
1 files changed, 421 insertions, 0 deletions
diff --git a/src/arrow/r/vignettes/dataset.Rmd b/src/arrow/r/vignettes/dataset.Rmd
new file mode 100644
index 000000000..3f33cbae4
--- /dev/null
+++ b/src/arrow/r/vignettes/dataset.Rmd
@@ -0,0 +1,421 @@
+---
+title: "Working with Arrow Datasets and dplyr"
+output: rmarkdown::html_vignette
+vignette: >
+ %\VignetteIndexEntry{Working with Arrow Datasets and dplyr}
+ %\VignetteEngine{knitr::rmarkdown}
+ %\VignetteEncoding{UTF-8}
+---
+
+Apache Arrow lets you work efficiently with large, multi-file datasets.
+The arrow R package provides a [dplyr](https://dplyr.tidyverse.org/) interface to Arrow Datasets,
+and other tools for interactive exploration of Arrow data.
+
+This vignette introduces Datasets and shows how to use dplyr to analyze them.
+
+## Example: NYC taxi data
+
+The [New York City taxi trip record data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
+is widely used in big data exercises and competitions.
+For demonstration purposes, we have hosted a Parquet-formatted version
+of about ten years of the trip data in a public Amazon S3 bucket.
+
+The total file size is around 37 gigabytes, even in the efficient Parquet file
+format. That's bigger than memory on most people's computers, so you can't just
+read it all in and stack it into a single data frame.
+
+In Windows (for R > 3.6) and macOS binary packages, S3 support is included.
+On Linux, when installing from source, S3 support is not enabled by default,
+and it has additional system requirements.
+See `vignette("install", package = "arrow")` for details.
+To see if your arrow installation has S3 support, run:
+
+```{r}
+arrow::arrow_with_s3()
+```
+
+Even with S3 support enabled, network speed will be a bottleneck unless your
+machine is located in the same AWS region as the data. So, for this vignette,
+we assume that the NYC taxi dataset has been downloaded locally in an "nyc-taxi"
+directory.
+
+### Retrieving data from a public Amazon S3 bucket
+
+If your arrow build has S3 support, you can sync the data locally with:
+
+```{r, eval = FALSE}
+arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")
+```
+
+If your arrow build doesn't have S3 support, you can download the files
+with some additional code:
+
+```{r, eval = FALSE}
+bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com"
+for (year in 2009:2019) {
+ if (year == 2019) {
+ # We only have through June 2019 there
+ months <- 1:6
+ } else {
+ months <- 1:12
+ }
+ for (month in sprintf("%02d", months)) {
+ dir.create(file.path("nyc-taxi", year, month), recursive = TRUE)
+ try(download.file(
+ paste(bucket, year, month, "data.parquet", sep = "/"),
+ file.path("nyc-taxi", year, month, "data.parquet"),
+ mode = "wb"
+ ), silent = TRUE)
+ }
+}
+```
+
+Note that these download steps in the vignette are not executed: if you want to run
+with live data, you'll have to do it yourself separately.
+Given the size, if you're running this locally and don't have a fast connection,
+feel free to grab only a year or two of data.
+
+If you don't have the taxi data downloaded, the vignette will still run and will
+yield previously cached output for reference. To be explicit about which version
+is running, let's check whether you're running with live data:
+
+```{r}
+dir.exists("nyc-taxi")
+```
+
+## Opening the dataset
+
+Because dplyr is not necessary for many Arrow workflows,
+it is an optional (`Suggests`) dependency. So, to work with Datasets,
+you need to load both arrow and dplyr.
+
+```{r}
+library(arrow, warn.conflicts = FALSE)
+library(dplyr, warn.conflicts = FALSE)
+```
+
+The first step is to create a Dataset object, pointing at the directory of data.
+
+```{r, eval = file.exists("nyc-taxi")}
+ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
+```
+
+The file format for `open_dataset()` is controlled by the `format` parameter,
+which has a default value of `"parquet"`. If you had a directory
+of Arrow format files, you could instead specify `format = "arrow"` in the call.
+
+Other supported formats include:
+
+* `"feather"` or `"ipc"` (aliases for `"arrow"`, as Feather v2 is the Arrow file format)
+* `"csv"` (comma-delimited files) and `"tsv"` (tab-delimited files)
+* `"text"` (generic text-delimited files - use the `delimiter` argument to specify which to use)
+
+For text files, you can pass the following parsing options to `open_dataset()`:
+
+* `delim`
+* `quote`
+* `escape_double`
+* `escape_backslash`
+* `skip_empty_rows`
+
+For more information on the usage of these parameters, see `?read_delim_arrow()`.
+
+The `partitioning` argument lets you specify how the file paths provide information
+about how the dataset is chunked into different files. The files in this example
+have file paths like
+
+```
+2009/01/data.parquet
+2009/02/data.parquet
+...
+```
+
+By providing `c("year", "month")` to the `partitioning` argument, you're saying that the first
+path segment gives the value for `year`, and the second segment is `month`.
+Every row in `2009/01/data.parquet` has a value of 2009 for `year`
+and 1 for `month`, even though those columns may not be present in the file.
+
+Indeed, when you look at the dataset, you can see that in addition to the columns present
+in every file, there are also columns `year` and `month` even though they are not present in the files themselves.
+
+```{r, eval = file.exists("nyc-taxi")}
+ds
+```
+```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
+cat("
+FileSystemDataset with 125 Parquet files
+vendor_id: string
+pickup_at: timestamp[us]
+dropoff_at: timestamp[us]
+passenger_count: int8
+trip_distance: float
+pickup_longitude: float
+pickup_latitude: float
+rate_code_id: null
+store_and_fwd_flag: string
+dropoff_longitude: float
+dropoff_latitude: float
+payment_type: string
+fare_amount: float
+extra: float
+mta_tax: float
+tip_amount: float
+tolls_amount: float
+total_amount: float
+year: int32
+month: int32
+
+See $metadata for additional Schema metadata
+")
+```
+
+The other form of partitioning currently supported is [Hive](https://hive.apache.org/)-style,
+in which the partition variable names are included in the path segments.
+If you had saved your files in paths like:
+
+```
+year=2009/month=01/data.parquet
+year=2009/month=02/data.parquet
+...
+```
+
+you would not have had to provide the names in `partitioning`;
+you could have just called `ds <- open_dataset("nyc-taxi")` and the partitions
+would have been detected automatically.
+
+## Querying the dataset
+
+Up to this point, you haven't loaded any data. You've walked directories to find
+files, you've parsed file paths to identify partitions, and you've read the
+headers of the Parquet files to inspect their schemas so that you can make sure
+they all are as expected.
+
+In the current release, arrow supports the dplyr verbs `mutate()`,
+`transmute()`, `select()`, `rename()`, `relocate()`, `filter()`, and
+`arrange()`. Aggregation is not yet supported, so before you call `summarise()`
+or other verbs with aggregate functions, use `collect()` to pull the selected
+subset of the data into an in-memory R data frame.
+
+Suppose you attempt to call unsupported dplyr verbs or unimplemented functions
+in your query on an Arrow Dataset. In that case, the arrow package raises an error. However,
+for dplyr queries on Arrow Table objects (which are already in memory), the
+package automatically calls `collect()` before processing that dplyr verb.
+
+Here's an example: suppose that you are curious about tipping behavior among the
+longest taxi rides. Let's find the median tip percentage for rides with
+fares greater than $100 in 2015, broken down by the number of passengers:
+
+```{r, eval = file.exists("nyc-taxi")}
+system.time(ds %>%
+ filter(total_amount > 100, year == 2015) %>%
+ select(tip_amount, total_amount, passenger_count) %>%
+ mutate(tip_pct = 100 * tip_amount / total_amount) %>%
+ group_by(passenger_count) %>%
+ collect() %>%
+ summarise(
+ median_tip_pct = median(tip_pct),
+ n = n()
+ ) %>%
+ print())
+```
+
+```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
+cat("
+# A tibble: 10 x 3
+ passenger_count median_tip_pct n
+ <int> <dbl> <int>
+ 1 0 9.84 380
+ 2 1 16.7 143087
+ 3 2 16.6 34418
+ 4 3 14.4 8922
+ 5 4 11.4 4771
+ 6 5 16.7 5806
+ 7 6 16.7 3338
+ 8 7 16.7 11
+ 9 8 16.7 32
+10 9 16.7 42
+
+ user system elapsed
+ 4.436 1.012 1.402
+")
+```
+
+You've just selected a subset out of a dataset with around 2 billion rows, computed
+a new column, and aggregated it in under 2 seconds on a modern laptop. How does
+this work?
+
+First, `mutate()`/`transmute()`, `select()`/`rename()`/`relocate()`, `filter()`,
+`group_by()`, and `arrange()` record their actions but don't evaluate on the
+data until you run `collect()`.
+
+```{r, eval = file.exists("nyc-taxi")}
+ds %>%
+ filter(total_amount > 100, year == 2015) %>%
+ select(tip_amount, total_amount, passenger_count) %>%
+ mutate(tip_pct = 100 * tip_amount / total_amount) %>%
+ group_by(passenger_count)
+```
+
+```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
+cat("
+FileSystemDataset (query)
+tip_amount: float
+total_amount: float
+passenger_count: int8
+tip_pct: expr
+
+* Filter: ((total_amount > 100) and (year == 2015))
+* Grouped by passenger_count
+See $.data for the source Arrow object
+")
+```
+
+This code returns an output instantly and shows the manipulations you've made, without
+loading data from the files. Because the evaluation of these queries is deferred,
+you can build up a query that selects down to a small subset without generating
+intermediate datasets that would potentially be large.
+
+Second, all work is pushed down to the individual data files,
+and depending on the file format, chunks of data within the files. As a result,
+you can select a subset of data from a much larger dataset by collecting the
+smaller slices from each file—you don't have to load the whole dataset in
+memory to slice from it.
+
+Third, because of partitioning, you can ignore some files entirely.
+In this example, by filtering `year == 2015`, all files corresponding to other years
+are immediately excluded: you don't have to load them in order to find that no
+rows match the filter. Relatedly, since Parquet files contain row groups with
+statistics on the data within, there may be entire chunks of data you can
+avoid scanning because they have no rows where `total_amount > 100`.
+
+## More dataset options
+
+There are a few ways you can control the Dataset creation to adapt to special use cases.
+
+### Work with files in a directory
+
+If you are working with a single file or a set of files that are not all in the
+same directory, you can provide a file path or a vector of multiple file paths
+to `open_dataset()`. This is useful if, for example, you have a single CSV file
+that is too big to read into memory. You could pass the file path to
+`open_dataset()`, use `group_by()` to partition the Dataset into manageable chunks,
+then use `write_dataset()` to write each chunk to a separate Parquet file—all
+without needing to read the full CSV file into R.
+
+### Explicitly declare column names and data types
+
+You can specify the `schema` argument to `open_dataset()` to declare the columns
+and their data types. This is useful if you have data files that have different
+storage schema (for example, a column could be `int32` in one and `int8` in
+another) and you want to ensure that the resulting Dataset has a specific type.
+
+To be clear, it's not necessary to specify a schema, even in this example of
+mixed integer types, because the Dataset constructor will reconcile differences
+like these. The schema specification just lets you declare what you want the
+result to be.
+
+### Explicitly declare partition format
+
+Similarly, you can provide a Schema in the `partitioning` argument of `open_dataset()`
+in order to declare the types of the virtual columns that define the partitions.
+This would be useful, in the taxi dataset example, if you wanted to keep
+`month` as a string instead of an integer.
+
+### Work with multiple data sources
+
+Another feature of Datasets is that they can be composed of multiple data sources.
+That is, you may have a directory of partitioned Parquet files in one location,
+and in another directory, files that haven't been partitioned.
+Or, you could point to an S3 bucket of Parquet data and a directory
+of CSVs on the local file system and query them together as a single dataset.
+To create a multi-source dataset, provide a list of datasets to `open_dataset()`
+instead of a file path, or simply concatenate them like `big_dataset <- c(ds1, ds2)`.
+
+## Writing datasets
+
+As you can see, querying a large dataset can be made quite fast by storage in an
+efficient binary columnar format like Parquet or Feather and partitioning based on
+columns commonly used for filtering. However, data isn't always stored that way.
+Sometimes you might start with one giant CSV. The first step in analyzing data
+is cleaning is up and reshaping it into a more usable form.
+
+The `write_dataset()` function allows you to take a Dataset or another tabular
+data object—an Arrow Table or RecordBatch, or an R data frame—and write
+it to a different file format, partitioned into multiple files.
+
+Assume that you have a version of the NYC Taxi data as CSV:
+
+```r
+ds <- open_dataset("nyc-taxi/csv/", format = "csv")
+```
+
+You can write it to a new location and translate the files to the Feather format
+by calling `write_dataset()` on it:
+
+```r
+write_dataset(ds, "nyc-taxi/feather", format = "feather")
+```
+
+Next, let's imagine that the `payment_type` column is something you often filter
+on, so you want to partition the data by that variable. By doing so you ensure
+that a filter like `payment_type == "Cash"` will touch only a subset of files
+where `payment_type` is always `"Cash"`.
+
+One natural way to express the columns you want to partition on is to use the
+`group_by()` method:
+
+```r
+ds %>%
+ group_by(payment_type) %>%
+ write_dataset("nyc-taxi/feather", format = "feather")
+```
+
+This will write files to a directory tree that looks like this:
+
+```r
+system("tree nyc-taxi/feather")
+```
+
+```
+## feather
+## ├── payment_type=1
+## │ └── part-18.feather
+## ├── payment_type=2
+## │ └── part-19.feather
+## ...
+## └── payment_type=UNK
+## └── part-17.feather
+##
+## 18 directories, 23 files
+```
+
+Note that the directory names are `payment_type=Cash` and similar:
+this is the Hive-style partitioning described above. This means that when
+you call `open_dataset()` on this directory, you don't have to declare what the
+partitions are because they can be read from the file paths.
+(To instead write bare values for partition segments, i.e. `Cash` rather than
+`payment_type=Cash`, call `write_dataset()` with `hive_style = FALSE`.)
+
+Perhaps, though, `payment_type == "Cash"` is the only data you ever care about,
+and you just want to drop the rest and have a smaller working set.
+For this, you can `filter()` them out when writing:
+
+```r
+ds %>%
+ filter(payment_type == "Cash") %>%
+ write_dataset("nyc-taxi/feather", format = "feather")
+```
+
+The other thing you can do when writing datasets is select a subset of columns
+or reorder them. Suppose you never care about `vendor_id`, and being a string column,
+it can take up a lot of space when you read it in, so let's drop it:
+
+```r
+ds %>%
+ group_by(payment_type) %>%
+ select(-vendor_id) %>%
+ write_dataset("nyc-taxi/feather", format = "feather")
+```
+
+Note that while you can select a subset of columns,
+you cannot currently rename columns when writing a dataset.