Skip to content

Instantly share code, notes, and snippets.

@rkingdc
Created February 28, 2019 20:39
Show Gist options
  • Save rkingdc/fa26775528e197e2ba2d6c7bdda66a35 to your computer and use it in GitHub Desktop.
Save rkingdc/fa26775528e197e2ba2d6c7bdda66a35 to your computer and use it in GitHub Desktop.
rkdc-blog: Binning Columns in Remote Tables with dplyr and rquery
db_dplyr_join_fn <- function(data, tbl_cuts, column_to_cut="column_to_cut"){
bin_choices <- data %>%
select(., id, k_dummy, !!rlang::sym(column_to_cut)) %>%
left_join(., tbl_cuts, by = 'k_dummy') %>%
filter(., cut >= !!rlang::sym(column_to_cut)) %>%
group_by(., id) %>%
summarise(., cut_ = min(cut, na.rm = TRUE))
return(dplyr::compute(dplyr::left_join(data, bin_choices, by = 'id')))
}
library(dplyr)
library(rquery)
library(DBI)
library(RPostgres)
library(microbenchmark)
db <- dbConnect(Postgres())
pg_bm1e2.3 <- run_benchmark(nrows=100, ncuts=3, db=db, times=30)
pg_bm1e3.3 <- run_benchmark(nrows=1e3, ncuts=3, db=db, times=30)
pg_bm1e4.3 <- run_benchmark(nrows=1e4, ncuts=3, db=db, times=30)
pg_bm1e3.10 <- run_benchmark(nrows=1e3, ncuts=10, db=db, times=30)
pg_bm1e3.100 <- run_benchmark(nrows=1e3, ncuts=100, db=db, times=30)
pg_bm1e3.1000 <- run_benchmark(nrows=1e3, ncuts=1000, db=db, times=30)
pg_bm1e3.100.idx <- run_benchmark(nrows=1e3, ncuts=100, db=db, times=30, add_index=TRUE)
pg_bm1e4.1000.idx <- run_benchmark(nrows=1e4, ncuts=1000, db=db, times=30, add_index=TRUE)
dbDisconnect(db)
.make_case_when_fn <- function(column_name, cut_vector){
# get names in various formats
s_column_name <- rlang::sym(column_name)
# the vector shouldn't have names, but if it has them, use those names instead of the
# canned ones then NULL out the names
if (!is.null(names(cut_vector))){
cut_names <- names(cut_vector)
cut_vector <- unname(cut_vector)
} else {
cut_names <- cut_vector
}
# construct the object case_when needs to work
case_expr <- lapply(c(0, seq_along(cut_vector)), function(i){
if (i == 0){
lab <- sprintf('%s', cut_names[i+1]) # a label
rlang::expr(!!s_column_name <= cut_vector[!!i+1] ~ !!lab) # the expression
} else if (i == length(cut_vector)) {
lab <- sprintf('>max', cut_names[i])
rlang::expr(!!s_column_name > cut_vector[!!i] ~ !!lab)
} else {
lab <- sprintf('%s', cut_names[i+1])
rlang::expr(!!s_column_name > cut_vector[!!i] & !!s_column_name <= cut_vector[!!i+1] ~ !!lab)
}
})
# return the function
return(function(data){
dplyr::compute(dplyr::mutate(data, cut_ := dplyr::case_when(!!!case_expr)))
})
}
make_data <- function(nrows){
df <- data.frame(column_to_edit = sample.int(10000, nrows, replace=TRUE),
id = seq(nrows),
k_dummy = 1)
df
}
.make_db_rquery_join_fn <- function(data, tbl_cuts, db=db, column_to_cut = 'column_to_cut'){
# test rquery connection options
dbopts <- rquery::rq_connection_tests(db)
# create rquery option connection
rqdb <- rquery::rquery_db_info(connection = db,
is_dbi = TRUE,
connection_options = dbopts)
bin_choices <- data %.>%
rquery::select_columns(., c("id", as.character(column_to_cut), 'k_dummy')) %.>%
rquery::natural_join(., tbl_cuts, jointype = "LEFT", by = 'k_dummy') %.>%
rquery::select_rows_se(., qe(cut >= column_to_cut)) %.>%
rquery::project(., groupby = "id", cut_ = min(cut))
annotation_ops <- rquery::natural_join(data, bin_choices, jointype = "LEFT", by = "id")
FUN = function(){
# oprtation will function as long as table and column names are the same
# it's even database agnostic
rquery::materialize(db = rqdb, optree = annotation_ops)
}
return(FUN)
}
run_benchmark <- function(nrows, ncuts, db, times=100, add_index=FALSE){
## setup ##
d <- make_data(nrows)
# can't add indexes to temporary tables
dbWriteTable(db, 'data', d, overwrite=TRUE, temporary=!add_index)
# for testing how indexes affect performance
if(add_index){
DBI::dbExecute(db, 'CREATE INDEX idx ON data (column_to_cut)')
DBI::dbExecute(db, 'ANALYZE data')
}
# dplyr and rquery both have remote table pointers
dplyr_data <- dplyr::tbl(db, 'data')
rq_data <- rquery::db_td(db, 'data')
# cut_vector based off number of cuts we want and range of random data
cut_vector <- seq(from=min(d$column_to_cut), to=max(d$column_to_cut), length.out=ncuts+2)
cut_vector <- cut_vector[-c(1, length(cut_vector))]
# write temporary data to database as well
tmpdat <- data.frame(cut = cut_vector, k_dummy=1)
DBI::dbWriteTable(db,
'tmp', tmpdat,
temporary=TRUE, overwrite=TRUE)
dplyr_cuts <- dplyr::tbl(db, 'tmp')
rq_cuts <- db_td(db, 'tmp')
# create binning functions for case_when and rquery
# the dplyr join method doesn't require any fnction setup--it's a simple expressions
db_case_when_case_fn <- .make_case_when_fn(column_name = 'column_to_cut', cut_vector=cut_vector)
db_rquery_join_fn <- .make_db_rquery_join_fn(data=rq_data, tbl_cuts=rq_cuts,
db=db, column_to_cut = 'column_to_cut')
mb <- microbenchmark(
case_when=db_case_when_case_fn(dplyr_data),
dplyr_join=db_dplyr_join_fn(data = dplyr_data, tbl_cuts=dplyr_cuts),
rquery_join=db_rquery_join_fn(),
times=times
)
if (add_index){
# if temp table, drop it
DBI::dbExecute(db, 'DROP TABLE data')
}
return(mb)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment