library(duckdb)Loading required package: DBI
library(dplyr, warn.conflicts = FALSE)
library(tidyr, warn.conflicts = FALSE)Use a familiar R frontend
For the d(b)plyr workflow, the connection step is very similar to the pure SQL approach. The only difference is that, after instantiating the database connection, we need to register our parquet dataset as a table in our connection via the dplyr::tbl() function. Note that we also assign it to an object (here: nyc) that can be referenced from R.
## Instantiate the in-memory DuckDB connection
con = dbConnect(duckdb(), shutdown = TRUE)
## Register our parquet dataset as table in our connection (and that assign it
## to an object that R understands)
# nyc = tbl(con, "nyc-taxi/**/*.parquet") # works, but safer to use the read_parquet func)
nyc = tbl(con, "read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)")This next command runs instantly because all computation is deferred (i.e., lazy eval). In other words, it is just a query object.
.by versus group_by
In case you weren’t aware: summarize(..., .by = x) is a shorthand (and non-persistent) version of group_by(x) |> summarize(...). More details here.
We can see what DuckDB’s query tree looks like by asking it to explain the plan
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
<SQL>
SELECT passenger_count, AVG(tip_amount) AS mean_tip
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
GROUP BY passenger_count
<PLAN>
physical_plan
┌───────────────────────────┐
│ HASH_GROUP_BY │
│ ──────────────────── │
│ Groups: #0 │
│ Aggregates: avg(#1) │
│ │
│ ~113,547,553 rows │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ PROJECTION │
│ ──────────────────── │
│ passenger_count │
│ tip_amount │
│ │
│ ~179,629,584 rows │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ READ_PARQUET │
│ ──────────────────── │
│ Function: │
│ READ_PARQUET │
│ │
│ Projections: │
│ passenger_count │
│ tip_amount │
│ │
│ ~179,629,584 rows │
└───────────────────────────┘
Similarly, to show the SQL translation that will be implemented on the backend, using show_query.
<SQL>
SELECT passenger_count, AVG(tip_amount) AS mean_tip
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
GROUP BY passenger_count
Note that printing the query object actually does enforce some computation. OTOH it’s still just a preview of the data (we haven’t pulled everything into R’s memory).
# Source: SQL [?? x 2]
# Database: DuckDB 1.4.3 [root@Darwin 24.6.0:R 4.5.2/:memory:]
passenger_count mean_tip
<dbl> <dbl>
1 7 0.544
2 5 1.10
3 9 0.807
4 177 1
5 65 0
6 208 0
7 10 0
8 2 1.08
9 1 1.15
10 8 0.351
11 66 1.5
12 247 2.3
13 249 0
14 4 0.845
15 0 0.862
16 254 0
17 3 0.963
18 6 1.13
To actually pull all of the result data into R, we must call collect() on the query object
# A tibble: 18 × 2
passenger_count mean_tip
<dbl> <dbl>
1 0 0.862
2 254 0
3 5 1.10
4 9 0.807
5 65 0
6 177 1
7 247 2.3
8 208 0
9 10 0
10 7 0.544
11 8 0.351
12 66 1.5
13 249 0
14 4 0.845
15 1 1.15
16 2 1.08
17 3 0.963
18 6 1.13
Time difference of 0.3836122 secs
Here’s our earlier filtering example with multiple grouping + aggregation variables…
# Source: SQL [?? x 4]
# Database: DuckDB 1.4.3 [root@Darwin 24.6.0:R 4.5.2/:memory:]
month passenger_count tip_amount fare_amount
<dbl> <dbl> <dbl> <dbl>
1 2 5 1.02 9.98
2 2 9 0 2.5
3 3 1 1.09 10.2
4 3 208 0 3.3
5 1 7 0 6.3
6 1 3 0.875 9.87
7 1 6 1.02 9.86
8 2 8 0.5 8.23
9 1 208 0 3.77
10 2 0 0.877 8.77
# ℹ more rows
Aside: note the optimised query includes hash groupings and projection (basically: fancy column subsetting, which is a suprisingly effective strategy in query optimization)
<SQL>
SELECT
"month",
passenger_count,
AVG(tip_amount) AS tip_amount,
AVG(fare_amount) AS fare_amount
FROM (
SELECT q01.*
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
WHERE ("month" <= 3.0)
) q01
GROUP BY "month", passenger_count
<PLAN>
physical_plan
┌───────────────────────────┐
│ HASH_GROUP_BY │
│ ──────────────────── │
│ Groups: │
│ #0 │
│ #1 │
│ │
│ Aggregates: │
│ avg(#2) │
│ avg(#3) │
│ │
│ ~44,907,395 rows │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ PROJECTION │
│ ──────────────────── │
│ month │
│ passenger_count │
│ tip_amount │
│ fare_amount │
│ │
│ ~44,907,396 rows │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ READ_PARQUET │
│ ──────────────────── │
│ Function: │
│ READ_PARQUET │
│ │
│ Projections: │
│ passenger_count │
│ fare_amount │
│ tip_amount │
│ month │
│ │
│ File Filters: │
│ (CAST(month AS DECIMAL(20 │
│ ,1)) <= 3.0) │
│ │
│ Scanning Files: 3/12 │
│ │
│ ~44,907,396 rows │
└───────────────────────────┘
And our high-dimensional aggregation example. We’ll create a query for this first, since I’ll reuse it shortly again
`summarise()` has grouped output by "passenger_count". You can override using
the `.groups` argument.
# A tibble: 25,569 × 4
# Groups: passenger_count [18]
passenger_count trip_distance tip_amount fare_amount
<dbl> <dbl> <dbl> <dbl>
1 4 0.6 0.278 4.92
2 1 4.9 1.73 16.2
3 2 1.6 0.697 7.88
4 1 5.1 1.76 16.6
5 1 9.1 3.33 25.1
6 3 0.8 0.414 5.47
7 1 3.95 1.44 14.0
8 3 1 0.467 6.12
9 5 0.53 0.367 4.31
10 0 1.6 0.728 7.36
# ℹ 25,559 more rows
`summarise()` has grouped output by "passenger_count". You can override using
the `.groups` argument.
`summarise()` has grouped output by "passenger_count". You can override using
the `.groups` argument.
# A tibble: 51,138 × 4
# Groups: passenger_count [18]
passenger_count trip_distance name value
<dbl> <dbl> <chr> <dbl>
1 1 0.76 tip_amount 0.498
2 1 0.78 tip_amount 0.503
3 1 1.84 tip_amount 0.857
4 1 0.69 tip_amount 0.469
5 1 1.26 tip_amount 0.672
6 1 7.6 tip_amount 2.42
7 1 1.21 tip_amount 0.658
8 1 1.6 tip_amount 0.816
9 2 22.2 tip_amount 4.10
10 3 2 tip_amount 0.725
# ℹ 51,128 more rows
Again, these commands complete instantly because all computation has been deferred until absolutely necessary (i.e.,. lazy eval).
Joining with `by = join_by(month)`
# A tibble: 12 × 3
month mean_fares mean_tips
<dbl> <dbl> <dbl>
1 8 10.5 1.08
2 6 10.5 1.09
3 9 12.4 1.25
4 5 10.6 1.08
5 7 10.4 1.06
6 3 10.2 1.06
7 2 9.94 1.04
8 4 10.3 1.04
9 12 12.3 1.24
10 11 12.3 1.25
11 10 12.5 1.28
12 1 9.81 1.01
If you recall from the native SQL API, we sampled 1 percent of the data before creating decile bins to reduce the computation burden of sorting the entire table. Unfortunately, this approach doesn’t work as well for the dplyr frontend because the underlying SQL translation uses a generic sampling approach (rather than DuckDB’s optimised USING SAMPLE statement.)
When going through the arrow intermediary, we don’t need to establish a database with DBI::dbConnect like we did above. Instead, we can create a link (pointers) to the dataset on disk directly via the arrow::open_dataset() convience function. Here I’ll assign it to a new R object called nyc2.
(For individual parquet files, we could just read then via arrow::read_parquet(), perhaps efficiently subsetting columns at the same time. But I find the open_dataset is generally what I’m looking for.)
Note that printing our nyc2 dataset to the R console will just display the data schema. This is a cheap and convenient way to quickly interrogate the basic structure of your data, including column types, etc.
FileSystemDataset with 12 Parquet files
24 columns
vendor_name: string
pickup_datetime: timestamp[ms]
dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
rate_code: string
store_and_fwd: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: string
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
total_amount: double
improvement_surcharge: double
congestion_surcharge: double
...
4 more columns
Use `schema()` to see entire schema
The key step for this “arrow + duckdb” dplyr workflow is to pass our arrow dataset to DuckDB via the to_duckdb() function.
# Source: table<arrow_001> [?? x 24]
# Database: DuckDB 1.4.3 [root@Darwin 24.6.0:R 4.5.2/:memory:]
vendor_name pickup_datetime dropoff_datetime passenger_count
<chr> <dttm> <dttm> <dbl>
1 CMT 2012-01-20 14:09:36 2012-01-20 14:42:25 1
2 CMT 2012-01-20 14:54:10 2012-01-20 15:06:55 1
3 CMT 2012-01-20 08:08:01 2012-01-20 08:11:02 1
4 CMT 2012-01-20 08:36:22 2012-01-20 08:39:44 1
5 CMT 2012-01-20 20:58:32 2012-01-20 21:03:04 1
6 CMT 2012-01-20 19:40:20 2012-01-20 19:43:43 2
7 CMT 2012-01-21 01:54:37 2012-01-21 02:08:02 2
8 CMT 2012-01-21 01:55:47 2012-01-21 02:08:51 3
9 VTS 2012-01-07 22:20:00 2012-01-07 22:27:00 2
10 VTS 2012-01-10 07:11:00 2012-01-10 07:21:00 1
# ℹ more rows
# ℹ 20 more variables: trip_distance <dbl>, pickup_longitude <dbl>,
# pickup_latitude <dbl>, rate_code <chr>, store_and_fwd <chr>,
# dropoff_longitude <dbl>, dropoff_latitude <dbl>, payment_type <chr>,
# fare_amount <dbl>, extra <dbl>, mta_tax <dbl>, tip_amount <dbl>,
# tolls_amount <dbl>, total_amount <dbl>, improvement_surcharge <dbl>,
# congestion_surcharge <dbl>, pickup_location_id <dbl>, …
Note that this transfer from Arrow to DuckDB is very quick (and memory cheap) because it is a zero copy. We are just passing around pointers instead of actually moving any data. See this blog post for more details, but the high-level take away is that we are benefitting from the tightly integrated architectures of these two libraries.1
At this, point all of the regular dplyr workflow logic from above should carry over. Just remember to first pass the arrow dataset via the to_duckdb() funciton. For example, here’s our initial aggregation query again:
# A tibble: 18 × 2
passenger_count mean_tip
<dbl> <dbl>
1 247 2.3
2 5 1.10
3 9 0.807
4 65 0
5 177 1
6 7 0.544
7 249 0
8 1 1.15
9 0 0.862
10 254 0
11 208 0
12 10 0
13 4 0.845
14 8 0.351
15 66 1.5
16 3 0.963
17 6 1.13
18 2 1.08
Some of you may be used to performing computation with the arrow package without going through DuckDB. What’s happening here is that arrow provides its own computation engine called “acero”. This Arrow-native engine is actually pretty performant… albeit not a fast as DuckDB, nor as feature rich. So I personally recommend always passing to DuckDB if you can. Still, if you’re curious then you can test yourself by re-trying the code chunk, but commenting out the to_duckdb() line. For more details, see here.
The new(ish) kid on the block is duckplyr (announcement / homepage). Without going into too much depth, the promise of duckplyr is that it can provide a “fully native” dplyr experience that is directly coupled to DuckDB’s query engine. So, for example, it won’t have to rely on DBI’s generic’ SQL translations. Instead, the relevant dplyr “verbs” are being directly translated to DuckDB’s relational API to construct logical query plans. If that’s too much jargon, just know that it should involve less overhead, fewer translation errors, and better optimization. Moreover, a goal of duckplyr is for it to be a drop-in replace for dplyr in general. In other words, you could just swap out library(dplyr) for library(duckplyr) and all of your data wrangling operations will come backed by the power of DuckDB. This includes for working on “regular” R data frames in memory.
All of this is exciting and I expect that duckplyr will become the default R -> DuckDB interface for many users… and perhaps their default data wrangling tool fullstop. There are still a few rough edges IMO (e.g., factor variables aren’t fully supported yet). But the basics are certainly all there. For example:
✔ Overwriting dplyr methods with duckplyr methods.
ℹ Turn off with `duckplyr::methods_restore()`.
# A duckplyr data frame: 2 variables
passenger_count mean_tip
<dbl> <dbl>
1 247 2.3
2 7 0.544
3 208 0
4 10 0
5 8 0.351
6 66 1.5
7 5 1.10
8 9 0.807
9 65 0
10 177 1
11 2 1.08
12 249 0
13 4 0.845
14 0 0.862
15 254 0
16 3 0.963
17 6 1.13
18 1 1.15
“Similar” might be a better description than “integrated”, since DuckdB does not use the Arrow memory model. But they are both columnar-orientated (among other things) and so the end result is pretty seamless integration.↩︎