DuckDB + dplyr (R)

Use a familiar R frontend

Load libraries

library(duckdb)
Loading required package: DBI
library(dplyr, warn.conflicts = FALSE)
library(tidyr, warn.conflicts = FALSE)

Create a database connection

For the d(b)plyr workflow, the connection step is very similar to the pure SQL approach. The only difference is that, after instantiating the database connection, we need to register our parquet dataset as a table in our connection via the dplyr::tbl() function. Note that we also assign it to an object (here: nyc) that can be referenced from R.

## Instantiate the in-memory DuckDB connection 
con = dbConnect(duckdb(), shutdown = TRUE)

## Register our parquet dataset as table in our connection (and that assign it
## to an object that R understands)
# nyc = tbl(con, "nyc-taxi/**/*.parquet") # works, but safer to use the read_parquet func)
nyc = tbl(con, "read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)")

First example

This next command runs instantly because all computation is deferred (i.e., lazy eval). In other words, it is just a query object.

q1 = nyc |>
  summarize(
    mean_tip = mean(tip_amount),
    .by = passenger_count
  )
.by versus group_by

In case you weren’t aware: summarize(..., .by = x) is a shorthand (and non-persistent) version of group_by(x) |> summarize(...). More details here.

We can see what DuckDB’s query tree looks like by asking it to explain the plan

explain(q1)
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
<SQL>
SELECT passenger_count, AVG(tip_amount) AS mean_tip
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
GROUP BY passenger_count

<PLAN>
physical_plan
┌───────────────────────────┐
│       HASH_GROUP_BY       │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│             #0            │
│          avg(#1)          │
└─────────────┬─────────────┘                             
┌─────────────┴─────────────┐
│         PROJECTION        │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│      passenger_count      │
│         tip_amount        │
└─────────────┬─────────────┘                             
┌─────────────┴─────────────┐
│       READ_PARQUET        │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│      passenger_count      │
│         tip_amount        │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│       EC: 179629584       │
└───────────────────────────┘                             

Similarly, to show the SQL translation that will be implemented on the backend, using show_query.

show_query(q1)
<SQL>
SELECT passenger_count, AVG(tip_amount) AS mean_tip
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
GROUP BY passenger_count

Note that printing the query object actually does enforce some computation. OTOH it’s still just a preview of the data (we haven’t pulled everything into R’s memory).

q1
# Source:   SQL [?? x 2]
# Database: DuckDB v0.10.1 [gmcd@Darwin 23.4.0:R 4.4.0/:memory:]
   passenger_count mean_tip
             <dbl>    <dbl>
 1               5    1.10 
 2              65    0    
 3               9    0.807
 4             177    1    
 5               0    0.862
 6             254    0    
 7             249    0    
 8               7    0.544
 9               8    0.351
10              66    1.5  
# ℹ more rows

To actually pull all of the result data into R, we must call collect() on the query object

tic = Sys.time()
dat1 = collect(q1)
toc = Sys.time()

dat1
# A tibble: 18 × 2
   passenger_count mean_tip
             <dbl>    <dbl>
 1               0    0.862
 2             254    0    
 3             249    0    
 4               5    1.10 
 5              65    0    
 6               9    0.807
 7             177    1    
 8               2    1.08 
 9             208    0    
10              10    0    
11               4    0.845
12               1    1.15 
13             247    2.3  
14               3    0.963
15               6    1.13 
16               7    0.544
17               8    0.351
18              66    1.5  
toc - tic
Time difference of 1.2924 secs

Aggregation

Here’s our earlier filtering example with multiple grouping + aggregation variables…

q2 = nyc |>
  filter(month <= 3) |>
  summarize(
    across(c(tip_amount, fare_amount), mean),
    .by = c(month, passenger_count)
  )
q2
# Source:   SQL [?? x 4]
# Database: DuckDB v0.10.1 [gmcd@Darwin 23.4.0:R 4.4.0/:memory:]
   month passenger_count tip_amount fare_amount
   <dbl>           <dbl>      <dbl>       <dbl>
 1     1               1      1.04         9.76
 2     2               1      1.07         9.90
 3     3               1      1.09        10.2 
 4     1               8      0           21.3 
 5     2               8      0.5          8.23
 6     1               7      0            6.3 
 7     1               3      0.875        9.87
 8     1               6      1.02         9.86
 9     2               3      0.895        9.98
10     2               6      1.02         9.96
# ℹ more rows

Aside: note the optimised query includes hash groupings and projection (basically: fancy column subsetting, which is a suprisingly effective strategy in query optimization)

explain(q2)
<SQL>
SELECT
  "month",
  passenger_count,
  AVG(tip_amount) AS tip_amount,
  AVG(fare_amount) AS fare_amount
FROM (
  SELECT q01.*
  FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
  WHERE ("month" <= 3.0)
) q01
GROUP BY "month", passenger_count

<PLAN>
physical_plan
┌───────────────────────────┐
│       HASH_GROUP_BY       │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│             #0            │
│             #1            │
│          avg(#2)          │
│          avg(#3)          │
└─────────────┬─────────────┘                             
┌─────────────┴─────────────┐
│         PROJECTION        │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│           month           │
│      passenger_count      │
│         tip_amount        │
│        fare_amount        │
└─────────────┬─────────────┘                             
┌─────────────┴─────────────┐
│       READ_PARQUET        │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│      passenger_count      │
│        fare_amount        │
│         tip_amount        │
│           month           │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│ File Filters: (month <= 3)│
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│        EC: 44907396       │
└───────────────────────────┘                             

And our high-dimensional aggregation example. We’ll create a query for this first, since I’ll reuse it shortly again

q3 = nyc |>
  group_by(passenger_count, trip_distance) |>
  summarize(
    across(c(tip_amount, fare_amount), mean),
  ) 
collect(q3)
# A tibble: 25,569 × 4
# Groups:   passenger_count [18]
   passenger_count trip_distance tip_amount fare_amount
             <dbl>         <dbl>      <dbl>       <dbl>
 1               1          0.7       0.493        5.04
 2               2          0.8       0.462        5.44
 3               1          0.8       0.535        5.36
 4               1          1         0.616        6.01
 5               1          0.3       0.334        4.11
 6               1          4.8       1.70        16.0 
 7               2          1         0.525        6.08
 8               1          6.7       2.13        20.1 
 9               1          0.4       0.361        4.17
10               2          0.98      0.542        5.85
# ℹ 25,559 more rows

Pivot (reshape)

# library(tidyr) ## already loaded

q3 |>
  pivot_longer(tip_amount:fare_amount) |>
  collect()
# A tibble: 51,138 × 4
# Groups:   passenger_count [18]
   passenger_count trip_distance name       value
             <dbl>         <dbl> <chr>      <dbl>
 1               1           2.2 tip_amount 1.01 
 2               1          16.8 tip_amount 5.58 
 3               1           2.7 tip_amount 1.16 
 4               1           5.9 tip_amount 1.93 
 5               1           8.4 tip_amount 2.93 
 6               0           2.4 tip_amount 0.924
 7               1           3.8 tip_amount 1.46 
 8               1           5.4 tip_amount 1.83 
 9               1           9.3 tip_amount 3.40 
10               1           9.8 tip_amount 3.60 
# ℹ 51,128 more rows

Joins (merges)

mean_tips  = nyc |> summarise(mean_tips = mean(tip_amount), .by = month)
mean_fares = nyc |> summarise(mean_fares = mean(fare_amount), .by = month)

Again, these commands complete instantly because all computation has been deferred until absolutely necessary (i.e.,. lazy eval).

left_join(
  mean_fares,
  mean_tips
  ) |>
  collect()
Joining with `by = join_by(month)`
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
# A tibble: 12 × 3
   month mean_fares mean_tips
   <dbl>      <dbl>     <dbl>
 1    11      12.3       1.25
 2     7      10.4       1.06
 3     8      10.5       1.08
 4     1       9.81      1.01
 5     4      10.3       1.04
 6     5      10.6       1.08
 7     9      12.4       1.25
 8    10      12.5       1.28
 9     2       9.94      1.04
10    12      12.3       1.24
11     3      10.2       1.06
12     6      10.5       1.09

Windowing

If you recall from the native SQL API, we sampled 1 percent of the data before creating decile bins to reduce the computation burden of sorting the entire table. Unfortunately, this approach doesn’t work as well for the dplyr frontend because the underlying SQL translation uses a generic sampling approach (rather than DuckDB’s optimised USING SAMPLE statement.)

Close connection

dbDisconnect(con)

Footnotes

  1. “Similar” might be a better description than “integrated”, since DuckdB does not use the Arrow memory model. But they are both columnar-orientated (among other things) and so the end result is pretty seamless integration.↩︎