1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
|
---
title: "Working with Arrow Datasets and dplyr"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{Working with Arrow Datasets and dplyr}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---
Apache Arrow lets you work efficiently with large, multi-file datasets.
The arrow R package provides a [dplyr](https://dplyr.tidyverse.org/) interface to Arrow Datasets,
and other tools for interactive exploration of Arrow data.
This vignette introduces Datasets and shows how to use dplyr to analyze them.
## Example: NYC taxi data
The [New York City taxi trip record data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
is widely used in big data exercises and competitions.
For demonstration purposes, we have hosted a Parquet-formatted version
of about ten years of the trip data in a public Amazon S3 bucket.
The total file size is around 37 gigabytes, even in the efficient Parquet file
format. That's bigger than memory on most people's computers, so you can't just
read it all in and stack it into a single data frame.
In Windows (for R > 3.6) and macOS binary packages, S3 support is included.
On Linux, when installing from source, S3 support is not enabled by default,
and it has additional system requirements.
See `vignette("install", package = "arrow")` for details.
To see if your arrow installation has S3 support, run:
```{r}
arrow::arrow_with_s3()
```
Even with S3 support enabled, network speed will be a bottleneck unless your
machine is located in the same AWS region as the data. So, for this vignette,
we assume that the NYC taxi dataset has been downloaded locally in an "nyc-taxi"
directory.
### Retrieving data from a public Amazon S3 bucket
If your arrow build has S3 support, you can sync the data locally with:
```{r, eval = FALSE}
arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")
```
If your arrow build doesn't have S3 support, you can download the files
with some additional code:
```{r, eval = FALSE}
bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com"
for (year in 2009:2019) {
if (year == 2019) {
# We only have through June 2019 there
months <- 1:6
} else {
months <- 1:12
}
for (month in sprintf("%02d", months)) {
dir.create(file.path("nyc-taxi", year, month), recursive = TRUE)
try(download.file(
paste(bucket, year, month, "data.parquet", sep = "/"),
file.path("nyc-taxi", year, month, "data.parquet"),
mode = "wb"
), silent = TRUE)
}
}
```
Note that these download steps in the vignette are not executed: if you want to run
with live data, you'll have to do it yourself separately.
Given the size, if you're running this locally and don't have a fast connection,
feel free to grab only a year or two of data.
If you don't have the taxi data downloaded, the vignette will still run and will
yield previously cached output for reference. To be explicit about which version
is running, let's check whether you're running with live data:
```{r}
dir.exists("nyc-taxi")
```
## Opening the dataset
Because dplyr is not necessary for many Arrow workflows,
it is an optional (`Suggests`) dependency. So, to work with Datasets,
you need to load both arrow and dplyr.
```{r}
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
```
The first step is to create a Dataset object, pointing at the directory of data.
```{r, eval = file.exists("nyc-taxi")}
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
```
The file format for `open_dataset()` is controlled by the `format` parameter,
which has a default value of `"parquet"`. If you had a directory
of Arrow format files, you could instead specify `format = "arrow"` in the call.
Other supported formats include:
* `"feather"` or `"ipc"` (aliases for `"arrow"`, as Feather v2 is the Arrow file format)
* `"csv"` (comma-delimited files) and `"tsv"` (tab-delimited files)
* `"text"` (generic text-delimited files - use the `delimiter` argument to specify which to use)
For text files, you can pass the following parsing options to `open_dataset()`:
* `delim`
* `quote`
* `escape_double`
* `escape_backslash`
* `skip_empty_rows`
For more information on the usage of these parameters, see `?read_delim_arrow()`.
The `partitioning` argument lets you specify how the file paths provide information
about how the dataset is chunked into different files. The files in this example
have file paths like
```
2009/01/data.parquet
2009/02/data.parquet
...
```
By providing `c("year", "month")` to the `partitioning` argument, you're saying that the first
path segment gives the value for `year`, and the second segment is `month`.
Every row in `2009/01/data.parquet` has a value of 2009 for `year`
and 1 for `month`, even though those columns may not be present in the file.
Indeed, when you look at the dataset, you can see that in addition to the columns present
in every file, there are also columns `year` and `month` even though they are not present in the files themselves.
```{r, eval = file.exists("nyc-taxi")}
ds
```
```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
cat("
FileSystemDataset with 125 Parquet files
vendor_id: string
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
passenger_count: int8
trip_distance: float
pickup_longitude: float
pickup_latitude: float
rate_code_id: null
store_and_fwd_flag: string
dropoff_longitude: float
dropoff_latitude: float
payment_type: string
fare_amount: float
extra: float
mta_tax: float
tip_amount: float
tolls_amount: float
total_amount: float
year: int32
month: int32
See $metadata for additional Schema metadata
")
```
The other form of partitioning currently supported is [Hive](https://hive.apache.org/)-style,
in which the partition variable names are included in the path segments.
If you had saved your files in paths like:
```
year=2009/month=01/data.parquet
year=2009/month=02/data.parquet
...
```
you would not have had to provide the names in `partitioning`;
you could have just called `ds <- open_dataset("nyc-taxi")` and the partitions
would have been detected automatically.
## Querying the dataset
Up to this point, you haven't loaded any data. You've walked directories to find
files, you've parsed file paths to identify partitions, and you've read the
headers of the Parquet files to inspect their schemas so that you can make sure
they all are as expected.
In the current release, arrow supports the dplyr verbs `mutate()`,
`transmute()`, `select()`, `rename()`, `relocate()`, `filter()`, and
`arrange()`. Aggregation is not yet supported, so before you call `summarise()`
or other verbs with aggregate functions, use `collect()` to pull the selected
subset of the data into an in-memory R data frame.
Suppose you attempt to call unsupported dplyr verbs or unimplemented functions
in your query on an Arrow Dataset. In that case, the arrow package raises an error. However,
for dplyr queries on Arrow Table objects (which are already in memory), the
package automatically calls `collect()` before processing that dplyr verb.
Here's an example: suppose that you are curious about tipping behavior among the
longest taxi rides. Let's find the median tip percentage for rides with
fares greater than $100 in 2015, broken down by the number of passengers:
```{r, eval = file.exists("nyc-taxi")}
system.time(ds %>%
filter(total_amount > 100, year == 2015) %>%
select(tip_amount, total_amount, passenger_count) %>%
mutate(tip_pct = 100 * tip_amount / total_amount) %>%
group_by(passenger_count) %>%
collect() %>%
summarise(
median_tip_pct = median(tip_pct),
n = n()
) %>%
print())
```
```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
cat("
# A tibble: 10 x 3
passenger_count median_tip_pct n
<int> <dbl> <int>
1 0 9.84 380
2 1 16.7 143087
3 2 16.6 34418
4 3 14.4 8922
5 4 11.4 4771
6 5 16.7 5806
7 6 16.7 3338
8 7 16.7 11
9 8 16.7 32
10 9 16.7 42
user system elapsed
4.436 1.012 1.402
")
```
You've just selected a subset out of a dataset with around 2 billion rows, computed
a new column, and aggregated it in under 2 seconds on a modern laptop. How does
this work?
First, `mutate()`/`transmute()`, `select()`/`rename()`/`relocate()`, `filter()`,
`group_by()`, and `arrange()` record their actions but don't evaluate on the
data until you run `collect()`.
```{r, eval = file.exists("nyc-taxi")}
ds %>%
filter(total_amount > 100, year == 2015) %>%
select(tip_amount, total_amount, passenger_count) %>%
mutate(tip_pct = 100 * tip_amount / total_amount) %>%
group_by(passenger_count)
```
```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
cat("
FileSystemDataset (query)
tip_amount: float
total_amount: float
passenger_count: int8
tip_pct: expr
* Filter: ((total_amount > 100) and (year == 2015))
* Grouped by passenger_count
See $.data for the source Arrow object
")
```
This code returns an output instantly and shows the manipulations you've made, without
loading data from the files. Because the evaluation of these queries is deferred,
you can build up a query that selects down to a small subset without generating
intermediate datasets that would potentially be large.
Second, all work is pushed down to the individual data files,
and depending on the file format, chunks of data within the files. As a result,
you can select a subset of data from a much larger dataset by collecting the
smaller slices from each file—you don't have to load the whole dataset in
memory to slice from it.
Third, because of partitioning, you can ignore some files entirely.
In this example, by filtering `year == 2015`, all files corresponding to other years
are immediately excluded: you don't have to load them in order to find that no
rows match the filter. Relatedly, since Parquet files contain row groups with
statistics on the data within, there may be entire chunks of data you can
avoid scanning because they have no rows where `total_amount > 100`.
## More dataset options
There are a few ways you can control the Dataset creation to adapt to special use cases.
### Work with files in a directory
If you are working with a single file or a set of files that are not all in the
same directory, you can provide a file path or a vector of multiple file paths
to `open_dataset()`. This is useful if, for example, you have a single CSV file
that is too big to read into memory. You could pass the file path to
`open_dataset()`, use `group_by()` to partition the Dataset into manageable chunks,
then use `write_dataset()` to write each chunk to a separate Parquet file—all
without needing to read the full CSV file into R.
### Explicitly declare column names and data types
You can specify the `schema` argument to `open_dataset()` to declare the columns
and their data types. This is useful if you have data files that have different
storage schema (for example, a column could be `int32` in one and `int8` in
another) and you want to ensure that the resulting Dataset has a specific type.
To be clear, it's not necessary to specify a schema, even in this example of
mixed integer types, because the Dataset constructor will reconcile differences
like these. The schema specification just lets you declare what you want the
result to be.
### Explicitly declare partition format
Similarly, you can provide a Schema in the `partitioning` argument of `open_dataset()`
in order to declare the types of the virtual columns that define the partitions.
This would be useful, in the taxi dataset example, if you wanted to keep
`month` as a string instead of an integer.
### Work with multiple data sources
Another feature of Datasets is that they can be composed of multiple data sources.
That is, you may have a directory of partitioned Parquet files in one location,
and in another directory, files that haven't been partitioned.
Or, you could point to an S3 bucket of Parquet data and a directory
of CSVs on the local file system and query them together as a single dataset.
To create a multi-source dataset, provide a list of datasets to `open_dataset()`
instead of a file path, or simply concatenate them like `big_dataset <- c(ds1, ds2)`.
## Writing datasets
As you can see, querying a large dataset can be made quite fast by storage in an
efficient binary columnar format like Parquet or Feather and partitioning based on
columns commonly used for filtering. However, data isn't always stored that way.
Sometimes you might start with one giant CSV. The first step in analyzing data
is cleaning is up and reshaping it into a more usable form.
The `write_dataset()` function allows you to take a Dataset or another tabular
data object—an Arrow Table or RecordBatch, or an R data frame—and write
it to a different file format, partitioned into multiple files.
Assume that you have a version of the NYC Taxi data as CSV:
```r
ds <- open_dataset("nyc-taxi/csv/", format = "csv")
```
You can write it to a new location and translate the files to the Feather format
by calling `write_dataset()` on it:
```r
write_dataset(ds, "nyc-taxi/feather", format = "feather")
```
Next, let's imagine that the `payment_type` column is something you often filter
on, so you want to partition the data by that variable. By doing so you ensure
that a filter like `payment_type == "Cash"` will touch only a subset of files
where `payment_type` is always `"Cash"`.
One natural way to express the columns you want to partition on is to use the
`group_by()` method:
```r
ds %>%
group_by(payment_type) %>%
write_dataset("nyc-taxi/feather", format = "feather")
```
This will write files to a directory tree that looks like this:
```r
system("tree nyc-taxi/feather")
```
```
## feather
## ├── payment_type=1
## │ └── part-18.feather
## ├── payment_type=2
## │ └── part-19.feather
## ...
## └── payment_type=UNK
## └── part-17.feather
##
## 18 directories, 23 files
```
Note that the directory names are `payment_type=Cash` and similar:
this is the Hive-style partitioning described above. This means that when
you call `open_dataset()` on this directory, you don't have to declare what the
partitions are because they can be read from the file paths.
(To instead write bare values for partition segments, i.e. `Cash` rather than
`payment_type=Cash`, call `write_dataset()` with `hive_style = FALSE`.)
Perhaps, though, `payment_type == "Cash"` is the only data you ever care about,
and you just want to drop the rest and have a smaller working set.
For this, you can `filter()` them out when writing:
```r
ds %>%
filter(payment_type == "Cash") %>%
write_dataset("nyc-taxi/feather", format = "feather")
```
The other thing you can do when writing datasets is select a subset of columns
or reorder them. Suppose you never care about `vendor_id`, and being a string column,
it can take up a lot of space when you read it in, so let's drop it:
```r
ds %>%
group_by(payment_type) %>%
select(-vendor_id) %>%
write_dataset("nyc-taxi/feather", format = "feather")
```
Note that while you can select a subset of columns,
you cannot currently rename columns when writing a dataset.
|