summaryrefslogtreecommitdiffstats
path: root/src/arrow/r/R/dataset.R
blob: 7207a554313d9ff0b3daec79780b23d5b40a639a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

#' Open a multi-file dataset
#'
#' Arrow Datasets allow you to query against data that has been split across
#' multiple files. This sharding of data may indicate partitioning, which
#' can accelerate queries that only touch some partitions (files). Call
#' `open_dataset()` to point to a directory of data files and return a
#' `Dataset`, then use `dplyr` methods to query it.
#'
#' @param sources One of:
#'   * a string path or URI to a directory containing data files
#'   * a string path or URI to a single file
#'   * a character vector of paths or URIs to individual data files
#'   * a list of `Dataset` objects as created by this function
#'   * a list of `DatasetFactory` objects as created by [dataset_factory()].
#'
#' When `sources` is a vector of file URIs, they must all use the same protocol
#' and point to files located in the same file system and having the same
#' format.
#' @param schema [Schema] for the `Dataset`. If `NULL` (the default), the schema
#' will be inferred from the data sources.
#' @param partitioning When `sources` is a directory path/URI, one of:
#'   * a `Schema`, in which case the file paths relative to `sources` will be
#'    parsed, and path segments will be matched with the schema fields. For
#'    example, `schema(year = int16(), month = int8())` would create partitions
#'    for file paths like `"2019/01/file.parquet"`, `"2019/02/file.parquet"`,
#'    etc.
#'   * a character vector that defines the field names corresponding to those
#'    path segments (that is, you're providing the names that would correspond
#'    to a `Schema` but the types will be autodetected)
#'   * a `HivePartitioning` or `HivePartitioningFactory`, as returned
#'    by [hive_partition()] which parses explicit or autodetected fields from
#'    Hive-style path segments
#'   * `NULL` for no partitioning
#'
#' The default is to autodetect Hive-style partitions. When `sources` is not a
#' directory path/URI, `partitioning` is ignored.
#' @param unify_schemas logical: should all data fragments (files, `Dataset`s)
#' be scanned in order to create a unified schema from them? If `FALSE`, only
#' the first fragment will be inspected for its schema. Use this fast path
#' when you know and trust that all fragments have an identical schema.
#' The default is `FALSE` when creating a dataset from a directory path/URI or
#' vector of file paths/URIs (because there may be many files and scanning may
#' be slow) but `TRUE` when `sources` is a list of `Dataset`s (because there
#' should be few `Dataset`s in the list and their `Schema`s are already in
#' memory).
#' @param format A [FileFormat] object, or a string identifier of the format of
#' the files in `x`. This argument is ignored when `sources` is a list of `Dataset` objects.
#' Currently supported values:
#' * "parquet"
#' * "ipc"/"arrow"/"feather", all aliases for each other; for Feather, note that
#'   only version 2 files are supported
#' * "csv"/"text", aliases for the same thing (because comma is the default
#'   delimiter for text files
#' * "tsv", equivalent to passing `format = "text", delimiter = "\t"`
#'
#' Default is "parquet", unless a `delimiter` is also specified, in which case
#' it is assumed to be "text".
#' @param ... additional arguments passed to `dataset_factory()` when `sources`
#' is a directory path/URI or vector of file paths/URIs, otherwise ignored.
#' These may include `format` to indicate the file format, or other
#' format-specific options.
#' @return A [Dataset] R6 object. Use `dplyr` methods on it to query the data,
#' or call [`$NewScan()`][Scanner] to construct a query directly.
#' @export
#' @seealso `vignette("dataset", package = "arrow")`
#' @include arrow-package.R
#' @examplesIf arrow_with_dataset() & arrow_with_parquet()
#' # Set up directory for examples
#' tf <- tempfile()
#' dir.create(tf)
#' on.exit(unlink(tf))
#'
#' data <- dplyr::group_by(mtcars, cyl)
#' write_dataset(data, tf)
#'
#' # You can specify a directory containing the files for your dataset and
#' # open_dataset will scan all files in your directory.
#' open_dataset(tf)
#'
#' # You can also supply a vector of paths
#' open_dataset(c(file.path(tf, "cyl=4/part-0.parquet"), file.path(tf, "cyl=8/part-0.parquet")))
#'
#' ## You must specify the file format if using a format other than parquet.
#' tf2 <- tempfile()
#' dir.create(tf2)
#' on.exit(unlink(tf2))
#' write_dataset(data, tf2, format = "ipc")
#' # This line will results in errors when you try to work with the data
#' \dontrun{
#' open_dataset(tf2)
#' }
#' # This line will work
#' open_dataset(tf2, format = "ipc")
#'
#' ## You can specify file partitioning to include it as a field in your dataset
#' # Create a temporary directory and write example dataset
#' tf3 <- tempfile()
#' dir.create(tf3)
#' on.exit(unlink(tf3))
#' write_dataset(airquality, tf3, partitioning = c("Month", "Day"), hive_style = FALSE)
#'
#' # View files - you can see the partitioning means that files have been written
#' # to folders based on Month/Day values
#' tf3_files <- list.files(tf3, recursive = TRUE)
#'
#' # With no partitioning specified, dataset contains all files but doesn't include
#' # directory names as field names
#' open_dataset(tf3)
#'
#' # Now that partitioning has been specified, your dataset contains columns for Month and Day
#' open_dataset(tf3, partitioning = c("Month", "Day"))
#'
#' # If you want to specify the data types for your fields, you can pass in a Schema
#' open_dataset(tf3, partitioning = schema(Month = int8(), Day = int8()))
open_dataset <- function(sources,
                         schema = NULL,
                         partitioning = hive_partition(),
                         unify_schemas = NULL,
                         format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"),
                         ...) {
  if (!arrow_with_dataset()) {
    stop("This build of the arrow package does not support Datasets", call. = FALSE)
  }
  if (is_list_of(sources, "Dataset")) {
    if (is.null(schema)) {
      if (is.null(unify_schemas) || isTRUE(unify_schemas)) {
        # Default is to unify schemas here
        schema <- unify_schemas(schemas = map(sources, ~ .$schema))
      } else {
        # Take the first one.
        schema <- sources[[1]]$schema
      }
    }
    # Enforce that all datasets have the same schema
    assert_is(schema, "Schema")
    sources <- lapply(sources, function(x) {
      x$schema <- schema
      x
    })
    return(dataset___UnionDataset__create(sources, schema))
  }

  factory <- DatasetFactory$create(sources, partitioning = partitioning, format = format, schema = schema, ...)
  tryCatch(
    # Default is _not_ to inspect/unify schemas
    factory$Finish(schema, isTRUE(unify_schemas)),
    error = function(e) {
      handle_parquet_io_error(e, format)
    }
  )
}

#' Multi-file datasets
#'
#' @description
#' Arrow Datasets allow you to query against data that has been split across
#' multiple files. This sharding of data may indicate partitioning, which
#' can accelerate queries that only touch some partitions (files).
#'
#' A `Dataset` contains one or more `Fragments`, such as files, of potentially
#' differing type and partitioning.
#'
#' For `Dataset$create()`, see [open_dataset()], which is an alias for it.
#'
#' `DatasetFactory` is used to provide finer control over the creation of `Dataset`s.
#'
#' @section Factory:
#' `DatasetFactory` is used to create a `Dataset`, inspect the [Schema] of the
#' fragments contained in it, and declare a partitioning.
#' `FileSystemDatasetFactory` is a subclass of `DatasetFactory` for
#' discovering files in the local file system, the only currently supported
#' file system.
#'
#' For the `DatasetFactory$create()` factory method, see [dataset_factory()], an
#' alias for it. A `DatasetFactory` has:
#'
#' - `$Inspect(unify_schemas)`: If `unify_schemas` is `TRUE`, all fragments
#' will be scanned and a unified [Schema] will be created from them; if `FALSE`
#' (default), only the first fragment will be inspected for its schema. Use this
#' fast path when you know and trust that all fragments have an identical schema.
#' - `$Finish(schema, unify_schemas)`: Returns a `Dataset`. If `schema` is provided,
#' it will be used for the `Dataset`; if omitted, a `Schema` will be created from
#' inspecting the fragments (files) in the dataset, following `unify_schemas`
#' as described above.
#'
#' `FileSystemDatasetFactory$create()` is a lower-level factory method and
#' takes the following arguments:
#' * `filesystem`: A [FileSystem]
#' * `selector`: Either a [FileSelector] or `NULL`
#' * `paths`: Either a character vector of file paths or `NULL`
#' * `format`: A [FileFormat]
#' * `partitioning`: Either `Partitioning`, `PartitioningFactory`, or `NULL`
#' @section Methods:
#'
#' A `Dataset` has the following methods:
#' - `$NewScan()`: Returns a [ScannerBuilder] for building a query
#' - `$schema`: Active binding that returns the [Schema] of the Dataset; you
#'   may also replace the dataset's schema by using `ds$schema <- new_schema`.
#'   This method currently supports only adding, removing, or reordering
#'   fields in the schema: you cannot alter or cast the field types.
#'
#' `FileSystemDataset` has the following methods:
#' - `$files`: Active binding, returns the files of the `FileSystemDataset`
#' - `$format`: Active binding, returns the [FileFormat] of the `FileSystemDataset`
#'
#' `UnionDataset` has the following methods:
#' - `$children`: Active binding, returns all child `Dataset`s.
#'
#' @export
#' @seealso [open_dataset()] for a simple interface to creating a `Dataset`
Dataset <- R6Class("Dataset",
  inherit = ArrowObject,
  public = list(
    # @description
    # Start a new scan of the data
    # @return A [ScannerBuilder]
    NewScan = function() dataset___Dataset__NewScan(self),
    ToString = function() self$schema$ToString()
  ),
  active = list(
    schema = function(schema) {
      if (missing(schema)) {
        dataset___Dataset__schema(self)
      } else {
        assert_is(schema, "Schema")
        invisible(dataset___Dataset__ReplaceSchema(self, schema))
      }
    },
    metadata = function() self$schema$metadata,
    num_rows = function() self$NewScan()$Finish()$CountRows(),
    num_cols = function() length(self$schema),
    # @description
    # Return the Dataset's type.
    type = function() dataset___Dataset__type_name(self)
  )
)
Dataset$create <- open_dataset

#' @name FileSystemDataset
#' @rdname Dataset
#' @export
FileSystemDataset <- R6Class("FileSystemDataset",
  inherit = Dataset,
  public = list(
    .class_title = function() {
      nfiles <- length(self$files)
      file_type <- self$format$type
      pretty_file_type <- list(
        parquet = "Parquet",
        ipc = "Feather"
      )[[file_type]]

      paste(
        class(self)[[1]],
        "with",
        nfiles,
        pretty_file_type %||% file_type,
        ifelse(nfiles == 1, "file", "files")
      )
    }
  ),
  active = list(
    # @description
    # Return the files contained in this `FileSystemDataset`
    files = function() dataset___FileSystemDataset__files(self),
    # @description
    # Return the format of files in this `Dataset`
    format = function() {
      dataset___FileSystemDataset__format(self)
    },
    # @description
    # Return the filesystem of files in this `Dataset`
    filesystem = function() {
      dataset___FileSystemDataset__filesystem(self)
    }
  )
)

#' @name UnionDataset
#' @rdname Dataset
#' @export
UnionDataset <- R6Class("UnionDataset",
  inherit = Dataset,
  active = list(
    # @description
    # Return the UnionDataset's child `Dataset`s
    children = function() {
      dataset___UnionDataset__children(self)
    }
  )
)

#' @name InMemoryDataset
#' @rdname Dataset
#' @export
InMemoryDataset <- R6Class("InMemoryDataset", inherit = Dataset)
InMemoryDataset$create <- function(x) {
  if (!arrow_with_dataset()) {
    stop("This build of the arrow package does not support Datasets", call. = FALSE)
  }
  if (!inherits(x, "Table")) {
    x <- Table$create(x)
  }
  dataset___InMemoryDataset__create(x)
}


#' @export
names.Dataset <- function(x) names(x$schema)

#' @export
dim.Dataset <- function(x) c(x$num_rows, x$num_cols)

#' @export
c.Dataset <- function(...) Dataset$create(list(...))

#' @export
head.Dataset <- function(x, n = 6L, ...) {
  head(Scanner$create(x), n)
}

#' @export
tail.Dataset <- function(x, n = 6L, ...) {
  tail(Scanner$create(x), n)
}

#' @export
`[.Dataset` <- function(x, i, j, ..., drop = FALSE) {
  if (nargs() == 2L) {
    # List-like column extraction (x[i])
    return(x[, i])
  }
  if (!missing(j)) {
    x <- select.Dataset(x, all_of(j))
  }

  if (!missing(i)) {
    x <- take_dataset_rows(x, i)
  }
  x
}

take_dataset_rows <- function(x, i) {
  if (!is.numeric(i) || any(i < 0)) {
    stop("Only slicing with positive indices is supported", call. = FALSE)
  }
  scanner <- Scanner$create(x)
  i <- Array$create(i - 1)
  dataset___Scanner__TakeRows(scanner, i)
}