Commits

Jonathan Keane authored 1be837f721e
ARROW-12688: [R] Use DuckDB to query an Arrow Dataset Am interface for using DuckDB + Arrow together. I've added two methods: * The proposed `summarise(..., .engine = "duckdb")` method which is (probably) the method that people want to use * A lower-level method of specifying exactly when the transfer takes place. `to_duckdb()` which registers the data with DuckDB and returns a `tbl` that can be used in dplyr pipelines For example, the following two pipelines are equivalent and will group_by the `lgl` column and then use DuckDB to do group-aggregations ``` ds <- InMemoryDataset$create(example_data) ds %>% group_by(lgl) %>% summarise(mean_int = mean(int, na.rm = TRUE), mean_dbl = mean(dbl, na.rm = TRUE), .engine = "duckdb") ds %>% to_duckdb() %>% group_by(lgl) %>% summarise(mean_int = mean(int, na.rm = TRUE), mean_dbl = mean(dbl, na.rm = TRUE)) %>% collect() ``` And doing a large aggregation on our taxi dataset (TL;DR with duckdb completes in 27 sec on my MacPro): ``` > library(arrow) > library(dplyr) > > ds <- open_dataset("~/repos/ab_store/data/taxi_parquet/", partitioning = c("year", "month")) > > system.time({ + results <- ds %>% + group_by(passenger_count, payment_type) %>% + summarise( + fare_amount_mean = mean(fare_amount, na.rm = TRUE), + tip_amount_mean = mean(tip_amount, na.rm = TRUE), + trip_distance_mean = mean(trip_distance, na.rm = TRUE), + .engine = "duckdb" + ) %>% + collect() + }) user system elapsed 190.925 15.646 27.398 > results # A tibble: 241 × 5 # Groups: passenger_count [59] passenger_count payment_type fare_amount_mean tip_amount_mean <int> <chr> <dbl> <dbl> 1 5 Credit 11.5 2.27 2 2 CASH 9.61 0.00000552 3 6 CASH 9.26 0 4 1 CASH 9.27 0.00000590 5 3 CASH 9.56 0.0000275 6 5 CASH 9.34 0.00000305 7 2 Credit 11.9 2.22 8 1 Credit 11.6 2.15 9 3 Credit 11.5 2.17 10 1 Cash 8.92 0.00206 # … with 231 more rows, and 1 more variable: trip_distance_mean <dbl> > ``` Using `to_duckdb()` takes longer since there is not (yet) filter pushdown, so all of the columns are being read then discarded. (TL;DR with duckdb completes in 72 sec on my MacPro): ``` > library(arrow) > library(dplyr) > > ds <- open_dataset("~/repos/ab_store/data/taxi_parquet/", partitioning = c("year", "month")) > > system.time({ + results <- ds %>% + select(-rate_code_id) %>% # this shouldn't be necessary, but duckdb hangs without it since it does not (yet) to filter pushdown + to_duckdb() %>% + group_by(passenger_count, payment_type) %>% + summarise( + fare_amount_mean = mean(fare_amount, na.rm = TRUE), + tip_amount_mean = mean(tip_amount, na.rm = TRUE), + trip_distance_mean = mean(trip_distance, na.rm = TRUE) + ) %>% + collect() + }) `summarise()` has grouped output by 'passenger_count'. You can override using the `.groups` argument. user system elapsed 433.151 49.224 72.044 > results # A tibble: 241 × 5 # Groups: passenger_count [59] passenger_count payment_type fare_amount_mean tip_amount_mean <int> <chr> <dbl> <dbl> 1 1 Credit 11.6 2.15 2 1 Cash 8.92 0.00206 3 2 Cash 9.94 0.00185 4 3 Cash 9.87 0.00142 5 4 Cash 10.1 0.00113 6 1 CASH 9.27 0.00000590 7 1 No Charge 9.83 0.0103 8 2 CASH 9.61 0.00000552 9 1 CREDIT 12.1 2.26 10 2 Credit 11.9 2.22 # … with 231 more rows, and 1 more variable: trip_distance_mean <dbl> > ``` Finally with R+dplyr (TL;DR does not complete, even on a system with 128GB of memory and even if I only pull + aggregate one column): ``` > library(arrow) > library(dplyr) > > ds <- open_dataset("~/repos/ab_store/data/taxi_parquet/", partitioning = c("year", "month")) > > system.time({ + results <- ds %>% + select(fare_amount, tip_amount, trip_distance, passenger_count, payment_type) %>% + collect() %>% + group_by(passenger_count, payment_type) %>% + summarise( + fare_amount_mean = mean(fare_amount, na.rm = TRUE), + tip_amount_mean = mean(tip_amount, na.rm = TRUE), + trip_distance_mean = mean(trip_distance, na.rm = TRUE) + ) + }) Error: Internal error in `dict_hash_with()`: Dictionary is full. Run `rlang::last_error()` to see where the error occurred. Timing stopped at: 190.2 151.7 115.9 > > rlang::last_error() <error/rlang_error> Internal error in `dict_hash_with()`: Dictionary is full. Backtrace: 1. base::system.time(...) 5. dplyr:::group_by.data.frame(., passenger_count, payment_type) 6. dplyr::grouped_df(groups$data, groups$group_names, .drop) 7. dplyr:::compute_groups(data, vars, drop = drop) 8. dplyr:::vec_split_id_order(group_vars) 9. vctrs::vec_group_loc(x) Run `rlang::last_trace()` to see the full context. ``` Closes #10780 from jonkeane/ARROW-12688-duckdb-passoff Authored-by: Jonathan Keane <jkeane@gmail.com> Signed-off-by: Jonathan Keane <jkeane@gmail.com>