library(duckdb)
Loading required package: DBI
library(dplyr, warn.conflicts = FALSE)
library(tidyr, warn.conflicts = FALSE)
Use a familiar R frontend
For the d(b)plyr
workflow, the connection step is very similar to the pure SQL approach. The only difference is that, after instantiating the database connection, we need to register our parquet dataset as a table in our connection via the dplyr::tbl()
function. Note that we also assign it to an object (here: nyc
) that can be referenced from R.
## Instantiate the in-memory DuckDB connection
con = dbConnect(duckdb(), shutdown = TRUE)
## Register our parquet dataset as table in our connection (and that assign it
## to an object that R understands)
# nyc = tbl(con, "nyc-taxi/**/*.parquet") # works, but safer to use the read_parquet func)
nyc = tbl(con, "read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)")
This next command runs instantly because all computation is deferred (i.e., lazy eval). In other words, it is just a query object.
.by
versus group_by
In case you weren’t aware: summarize(..., .by = x)
is a shorthand (and non-persistent) version of group_by(x) |> summarize(...)
. More details here.
We can see what DuckDB’s query tree looks like by asking it to explain the plan
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
<SQL>
SELECT passenger_count, AVG(tip_amount) AS mean_tip
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
GROUP BY passenger_count
<PLAN>
physical_plan
┌───────────────────────────┐
│ HASH_GROUP_BY │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ #0 │
│ avg(#1) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ PROJECTION │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ passenger_count │
│ tip_amount │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ READ_PARQUET │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ passenger_count │
│ tip_amount │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ EC: 179629584 │
└───────────────────────────┘
Similarly, to show the SQL translation that will be implemented on the backend, using show_query
.
<SQL>
SELECT passenger_count, AVG(tip_amount) AS mean_tip
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
GROUP BY passenger_count
Note that printing the query object actually does enforce some computation. OTOH it’s still just a preview of the data (we haven’t pulled everything into R’s memory).
# Source: SQL [?? x 2]
# Database: DuckDB v0.10.1 [gmcd@Darwin 23.4.0:R 4.4.0/:memory:]
passenger_count mean_tip
<dbl> <dbl>
1 5 1.10
2 65 0
3 9 0.807
4 177 1
5 0 0.862
6 254 0
7 249 0
8 7 0.544
9 8 0.351
10 66 1.5
# ℹ more rows
To actually pull all of the result data into R, we must call collect()
on the query object
# A tibble: 18 × 2
passenger_count mean_tip
<dbl> <dbl>
1 0 0.862
2 254 0
3 249 0
4 5 1.10
5 65 0
6 9 0.807
7 177 1
8 2 1.08
9 208 0
10 10 0
11 4 0.845
12 1 1.15
13 247 2.3
14 3 0.963
15 6 1.13
16 7 0.544
17 8 0.351
18 66 1.5
Time difference of 1.2924 secs
Here’s our earlier filtering example with multiple grouping + aggregation variables…
q2 = nyc |>
filter(month <= 3) |>
summarize(
across(c(tip_amount, fare_amount), mean),
.by = c(month, passenger_count)
)
q2
# Source: SQL [?? x 4]
# Database: DuckDB v0.10.1 [gmcd@Darwin 23.4.0:R 4.4.0/:memory:]
month passenger_count tip_amount fare_amount
<dbl> <dbl> <dbl> <dbl>
1 1 1 1.04 9.76
2 2 1 1.07 9.90
3 3 1 1.09 10.2
4 1 8 0 21.3
5 2 8 0.5 8.23
6 1 7 0 6.3
7 1 3 0.875 9.87
8 1 6 1.02 9.86
9 2 3 0.895 9.98
10 2 6 1.02 9.96
# ℹ more rows
Aside: note the optimised query includes hash groupings and projection (basically: fancy column subsetting, which is a suprisingly effective strategy in query optimization)
<SQL>
SELECT
"month",
passenger_count,
AVG(tip_amount) AS tip_amount,
AVG(fare_amount) AS fare_amount
FROM (
SELECT q01.*
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
WHERE ("month" <= 3.0)
) q01
GROUP BY "month", passenger_count
<PLAN>
physical_plan
┌───────────────────────────┐
│ HASH_GROUP_BY │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ #0 │
│ #1 │
│ avg(#2) │
│ avg(#3) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ PROJECTION │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ month │
│ passenger_count │
│ tip_amount │
│ fare_amount │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ READ_PARQUET │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ passenger_count │
│ fare_amount │
│ tip_amount │
│ month │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ File Filters: (month <= 3)│
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ EC: 44907396 │
└───────────────────────────┘
And our high-dimensional aggregation example. We’ll create a query for this first, since I’ll reuse it shortly again
q3 = nyc |>
group_by(passenger_count, trip_distance) |>
summarize(
across(c(tip_amount, fare_amount), mean),
)
collect(q3)
# A tibble: 25,569 × 4
# Groups: passenger_count [18]
passenger_count trip_distance tip_amount fare_amount
<dbl> <dbl> <dbl> <dbl>
1 1 0.7 0.493 5.04
2 2 0.8 0.462 5.44
3 1 0.8 0.535 5.36
4 1 1 0.616 6.01
5 1 0.3 0.334 4.11
6 1 4.8 1.70 16.0
7 2 1 0.525 6.08
8 1 6.7 2.13 20.1
9 1 0.4 0.361 4.17
10 2 0.98 0.542 5.85
# ℹ 25,559 more rows
# A tibble: 51,138 × 4
# Groups: passenger_count [18]
passenger_count trip_distance name value
<dbl> <dbl> <chr> <dbl>
1 1 2.2 tip_amount 1.01
2 1 16.8 tip_amount 5.58
3 1 2.7 tip_amount 1.16
4 1 5.9 tip_amount 1.93
5 1 8.4 tip_amount 2.93
6 0 2.4 tip_amount 0.924
7 1 3.8 tip_amount 1.46
8 1 5.4 tip_amount 1.83
9 1 9.3 tip_amount 3.40
10 1 9.8 tip_amount 3.60
# ℹ 51,128 more rows
Again, these commands complete instantly because all computation has been deferred until absolutely necessary (i.e.,. lazy eval).
Joining with `by = join_by(month)`
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
# A tibble: 12 × 3
month mean_fares mean_tips
<dbl> <dbl> <dbl>
1 11 12.3 1.25
2 7 10.4 1.06
3 8 10.5 1.08
4 1 9.81 1.01
5 4 10.3 1.04
6 5 10.6 1.08
7 9 12.4 1.25
8 10 12.5 1.28
9 2 9.94 1.04
10 12 12.3 1.24
11 3 10.2 1.06
12 6 10.5 1.09
If you recall from the native SQL API, we sampled 1 percent of the data before creating decile bins to reduce the computation burden of sorting the entire table. Unfortunately, this approach doesn’t work as well for the dplyr frontend because the underlying SQL translation uses a generic sampling approach (rather than DuckDB’s optimised USING SAMPLE
statement.)
When going through the arrow intermediary, we don’t need to establish a database with DBI::dbConnect
like we did above. Instead, we can create a link (pointers) to the dataset on disk directly via the arrow::open_dataset()
convience function. Here I’ll assign it to a new R object called nyc2
.
(For individual parquet files, we could just read then via arrow::read_parquet()
, perhaps efficiently subsetting columns at the same time. But I find the open_dataset
is generally what I’m looking for.)
Note that printing our nyc2
dataset to the R console will just display the data schema. This is a cheap and convenient way to quickly interrogate the basic structure of your data, including column types, etc.
FileSystemDataset with 12 Parquet files
vendor_name: string
pickup_datetime: timestamp[ms]
dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
rate_code: string
store_and_fwd: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: string
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
total_amount: double
improvement_surcharge: double
congestion_surcharge: double
pickup_location_id: int64
dropoff_location_id: int64
year: int32
month: int32
The key step for this “arrow + duckdb” dplyr workflow is to pass our arrow dataset to DuckDB via the to_duckdb()
function.
# Source: table<arrow_001> [?? x 24]
# Database: DuckDB v0.10.1 [root@Darwin 23.4.0:R 4.4.0/:memory:]
vendor_name pickup_datetime dropoff_datetime passenger_count
<chr> <dttm> <dttm> <dbl>
1 CMT 2012-01-20 14:09:36 2012-01-20 14:42:25 1
2 CMT 2012-01-20 14:54:10 2012-01-20 15:06:55 1
3 CMT 2012-01-20 08:08:01 2012-01-20 08:11:02 1
4 CMT 2012-01-20 08:36:22 2012-01-20 08:39:44 1
5 CMT 2012-01-20 20:58:32 2012-01-20 21:03:04 1
6 CMT 2012-01-20 19:40:20 2012-01-20 19:43:43 2
7 CMT 2012-01-21 01:54:37 2012-01-21 02:08:02 2
8 CMT 2012-01-21 01:55:47 2012-01-21 02:08:51 3
9 VTS 2012-01-07 22:20:00 2012-01-07 22:27:00 2
10 VTS 2012-01-10 07:11:00 2012-01-10 07:21:00 1
# ℹ more rows
# ℹ 20 more variables: trip_distance <dbl>, pickup_longitude <dbl>,
# pickup_latitude <dbl>, rate_code <chr>, store_and_fwd <chr>,
# dropoff_longitude <dbl>, dropoff_latitude <dbl>, payment_type <chr>,
# fare_amount <dbl>, extra <dbl>, mta_tax <dbl>, tip_amount <dbl>,
# tolls_amount <dbl>, total_amount <dbl>, improvement_surcharge <dbl>,
# congestion_surcharge <dbl>, pickup_location_id <dbl>, …
Note that this transfer from Arrow to DuckDB is very quick (and memory cheap) because it is a zero copy. We are just passing around pointers instead of actually moving any data. See this blog post for more details, but the high-level take away is that we are benefitting from the tightly integrated architectures of these two libraries.1
At this, point all of the regular dplyr workflow logic from above should carry over. Just remember to first pass the arrow dataset via the to_duckdb()
funciton. For example, here’s our initial aggregation query again:
nyc2 |>
to_duckdb() |> ## <= key step
summarise(
mean_tip = mean(tip_amount),
.by = passenger_count
) |>
collect()
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
# A tibble: 18 × 2
passenger_count mean_tip
<dbl> <dbl>
1 5 1.10
2 9 0.807
3 65 0
4 177 1
5 7 0.544
6 8 0.351
7 66 1.5
8 1 1.15
9 247 2.3
10 4 0.845
11 0 0.862
12 254 0
13 249 0
14 2 1.08
15 208 0
16 10 0
17 3 0.963
18 6 1.13
Some of you may be used to performing computation with the arrow package without going through DuckDB. What’s happening here is that arrow provides its own computation engine called “acero”. This Arrow-native engine is actually pretty performant… albeit not a fast as DuckDB, nor as feature rich. So I personally recommend always passing to DuckDB if you can. Still, if you’re curious then you can test yourself by re-trying the code chunk, but commenting out the to_duckdb()
line. For more details, see here.
The new kid on the block is duckplyr (announcement / homepage). Without going into too much depth, the promise of duckplyr is that it can provide a “fully native” dplyr experience that is directly coupled to DuckDB’s query engine. So, for example, it won’t have to rely on DBI’s generic’ SQL translations. Instead, the relevant dplyr “verbs” are being directly translated to DuckDB’s relational API to construct logical query plans. If that’s too much jargon, just know that it should involve less overhead, fewer translation errors, and better optimization. Moreover, a goal of duckplyr is for it to be a drop-in replace for dplyr in general. In other words, you could just swap out library(dplyr)
for library(duckplyr)
and all of your data wrangling operations will come backed by the power of DuckDB. This includes for working on “regular” R data frames in memory.
All of this is exciting and I would urge you stay tuned. Right now, duckplyr is still marked as experimental and has a few rough edges. But the basics are there. For example:
library(duckplyr, warn.conflicts = FALSE)
duckplyr_df_from_parquet("nyc-taxi/**/*.parquet") |>
summarise(
mean_tip = mean(tip_amount),
.by = passenger_count
)
materializing:
---------------------
--- Relation Tree ---
---------------------
Aggregate [passenger_count, mean(tip_amount)]
read_parquet(nyc-taxi/**/*.parquet)
---------------------
-- Result Columns --
---------------------
- passenger_count (BIGINT)
- mean_tip (DOUBLE)
passenger_count mean_tip
1 2 1.0815798
2 208 0.0000000
3 10 0.0000000
4 5 1.1027325
5 9 0.8068000
6 65 0.0000000
7 177 1.0000000
8 3 0.9629494
9 6 1.1283649
10 0 0.8620988
11 249 0.0000000
12 254 0.0000000
13 8 0.3507692
14 7 0.5441176
15 66 1.5000000
16 1 1.1510110
17 247 2.3000000
18 4 0.8445190
“Similar” might be a better description than “integrated”, since DuckdB does not use the Arrow memory model. But they are both columnar-orientated (among other things) and so the end result is pretty seamless integration.↩︎