library(arrow)
#>
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#>
#> timestamp
library(duckdb)
#> Loading required package: DBI
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(tictoc)
# arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")
# beepr::beep()
ds <- open_dataset("~/dev/personal/taxi/nyc-taxi/", partitioning = c("year", "month"))
dat_16 <- filter(ds, year == 2016)
dat_19 <- filter(ds, year == 2019)
con <- DBI::dbConnect(duckdb::duckdb(), "db_temp")
duckdb_register_arrow(con, "dat_16", dat_16)
duckdb_register_arrow(con, "dat_19", dat_19)
duckdb_list_arrow(con)
#> [1] "dat_16" "dat_19"
tic("send query")
res <- dbSendQuery(con,
"
SELECT
d16.vendor_id,
d16.month,
d16.passenger_count AS p_count_16,
d19.passenger_count AS p_count_19,
d16.fare_amount AS amt_16,
d19.fare_amount AS amt_19,
d16.trip_distance AS dist_16,
d19.trip_distance AS dist_19
FROM
dat_16 d16
LEFT JOIN
dat_19 d19
ON
d16.vendor_id = d19.vendor_id AND
d16.month = d19.month AND
d16.pickup_longitude = d19.pickup_longitude AND
d16.pickup_latitude = d19.pickup_latitude AND
d16.fare_amount >= d19.fare_amount AND
d16.trip_distance <= d19.trip_distance;
", arrow = TRUE)
toc()
#> send query: 1.053 sec elapsed
tic("fetch")
res_batch <- duckdb_fetch_record_batch(res)
toc()
#> fetch: 0.005 sec elapsed
tic("open and see head")
to_arrow(res_batch) |> head() |> collect()
#> # A tibble: 6 × 8
#> vendor_id month p_count_16 p_count_19 amt_16 amt_19 dist_16 dist_19
#> <chr> <int> <int> <int> <dbl> <dbl> <dbl> <dbl>
#> 1 2 1 2 NA 7.5 NA 1.10 NA
#> 2 2 1 5 NA 18 NA 4.90 NA
#> 3 2 1 1 NA 33 NA 10.5 NA
#> 4 2 1 1 NA 16.5 NA 4.75 NA
#> 5 2 1 3 NA 8 NA 1.76 NA
#> 6 2 1 2 NA 19 NA 5.52 NA
toc()
#> open and see head: 13.465 sec elapsed
Created on 2022-07-27 by the reprex package (v2.0.1)