Created
February 28, 2019 20:39
-
-
Save rkingdc/fa26775528e197e2ba2d6c7bdda66a35 to your computer and use it in GitHub Desktop.
rkdc-blog: Binning Columns in Remote Tables with dplyr and rquery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
db_dplyr_join_fn <- function(data, tbl_cuts, column_to_cut="column_to_cut"){ | |
bin_choices <- data %>% | |
select(., id, k_dummy, !!rlang::sym(column_to_cut)) %>% | |
left_join(., tbl_cuts, by = 'k_dummy') %>% | |
filter(., cut >= !!rlang::sym(column_to_cut)) %>% | |
group_by(., id) %>% | |
summarise(., cut_ = min(cut, na.rm = TRUE)) | |
return(dplyr::compute(dplyr::left_join(data, bin_choices, by = 'id'))) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(dplyr) | |
library(rquery) | |
library(DBI) | |
library(RPostgres) | |
library(microbenchmark) | |
db <- dbConnect(Postgres()) | |
pg_bm1e2.3 <- run_benchmark(nrows=100, ncuts=3, db=db, times=30) | |
pg_bm1e3.3 <- run_benchmark(nrows=1e3, ncuts=3, db=db, times=30) | |
pg_bm1e4.3 <- run_benchmark(nrows=1e4, ncuts=3, db=db, times=30) | |
pg_bm1e3.10 <- run_benchmark(nrows=1e3, ncuts=10, db=db, times=30) | |
pg_bm1e3.100 <- run_benchmark(nrows=1e3, ncuts=100, db=db, times=30) | |
pg_bm1e3.1000 <- run_benchmark(nrows=1e3, ncuts=1000, db=db, times=30) | |
pg_bm1e3.100.idx <- run_benchmark(nrows=1e3, ncuts=100, db=db, times=30, add_index=TRUE) | |
pg_bm1e4.1000.idx <- run_benchmark(nrows=1e4, ncuts=1000, db=db, times=30, add_index=TRUE) | |
dbDisconnect(db) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.make_case_when_fn <- function(column_name, cut_vector){ | |
# get names in various formats | |
s_column_name <- rlang::sym(column_name) | |
# the vector shouldn't have names, but if it has them, use those names instead of the | |
# canned ones then NULL out the names | |
if (!is.null(names(cut_vector))){ | |
cut_names <- names(cut_vector) | |
cut_vector <- unname(cut_vector) | |
} else { | |
cut_names <- cut_vector | |
} | |
# construct the object case_when needs to work | |
case_expr <- lapply(c(0, seq_along(cut_vector)), function(i){ | |
if (i == 0){ | |
lab <- sprintf('%s', cut_names[i+1]) # a label | |
rlang::expr(!!s_column_name <= cut_vector[!!i+1] ~ !!lab) # the expression | |
} else if (i == length(cut_vector)) { | |
lab <- sprintf('>max', cut_names[i]) | |
rlang::expr(!!s_column_name > cut_vector[!!i] ~ !!lab) | |
} else { | |
lab <- sprintf('%s', cut_names[i+1]) | |
rlang::expr(!!s_column_name > cut_vector[!!i] & !!s_column_name <= cut_vector[!!i+1] ~ !!lab) | |
} | |
}) | |
# return the function | |
return(function(data){ | |
dplyr::compute(dplyr::mutate(data, cut_ := dplyr::case_when(!!!case_expr))) | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
make_data <- function(nrows){ | |
df <- data.frame(column_to_edit = sample.int(10000, nrows, replace=TRUE), | |
id = seq(nrows), | |
k_dummy = 1) | |
df | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.make_db_rquery_join_fn <- function(data, tbl_cuts, db=db, column_to_cut = 'column_to_cut'){ | |
# test rquery connection options | |
dbopts <- rquery::rq_connection_tests(db) | |
# create rquery option connection | |
rqdb <- rquery::rquery_db_info(connection = db, | |
is_dbi = TRUE, | |
connection_options = dbopts) | |
bin_choices <- data %.>% | |
rquery::select_columns(., c("id", as.character(column_to_cut), 'k_dummy')) %.>% | |
rquery::natural_join(., tbl_cuts, jointype = "LEFT", by = 'k_dummy') %.>% | |
rquery::select_rows_se(., qe(cut >= column_to_cut)) %.>% | |
rquery::project(., groupby = "id", cut_ = min(cut)) | |
annotation_ops <- rquery::natural_join(data, bin_choices, jointype = "LEFT", by = "id") | |
FUN = function(){ | |
# oprtation will function as long as table and column names are the same | |
# it's even database agnostic | |
rquery::materialize(db = rqdb, optree = annotation_ops) | |
} | |
return(FUN) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
run_benchmark <- function(nrows, ncuts, db, times=100, add_index=FALSE){ | |
## setup ## | |
d <- make_data(nrows) | |
# can't add indexes to temporary tables | |
dbWriteTable(db, 'data', d, overwrite=TRUE, temporary=!add_index) | |
# for testing how indexes affect performance | |
if(add_index){ | |
DBI::dbExecute(db, 'CREATE INDEX idx ON data (column_to_cut)') | |
DBI::dbExecute(db, 'ANALYZE data') | |
} | |
# dplyr and rquery both have remote table pointers | |
dplyr_data <- dplyr::tbl(db, 'data') | |
rq_data <- rquery::db_td(db, 'data') | |
# cut_vector based off number of cuts we want and range of random data | |
cut_vector <- seq(from=min(d$column_to_cut), to=max(d$column_to_cut), length.out=ncuts+2) | |
cut_vector <- cut_vector[-c(1, length(cut_vector))] | |
# write temporary data to database as well | |
tmpdat <- data.frame(cut = cut_vector, k_dummy=1) | |
DBI::dbWriteTable(db, | |
'tmp', tmpdat, | |
temporary=TRUE, overwrite=TRUE) | |
dplyr_cuts <- dplyr::tbl(db, 'tmp') | |
rq_cuts <- db_td(db, 'tmp') | |
# create binning functions for case_when and rquery | |
# the dplyr join method doesn't require any fnction setup--it's a simple expressions | |
db_case_when_case_fn <- .make_case_when_fn(column_name = 'column_to_cut', cut_vector=cut_vector) | |
db_rquery_join_fn <- .make_db_rquery_join_fn(data=rq_data, tbl_cuts=rq_cuts, | |
db=db, column_to_cut = 'column_to_cut') | |
mb <- microbenchmark( | |
case_when=db_case_when_case_fn(dplyr_data), | |
dplyr_join=db_dplyr_join_fn(data = dplyr_data, tbl_cuts=dplyr_cuts), | |
rquery_join=db_rquery_join_fn(), | |
times=times | |
) | |
if (add_index){ | |
# if temp table, drop it | |
DBI::dbExecute(db, 'DROP TABLE data') | |
} | |
return(mb) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment