Skip to content

Instantly share code, notes, and snippets.

@BenjaminWolfe
Last active June 26, 2022 11:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save BenjaminWolfe/0b23c5a59a4f008fee9cc54dd91c91e8 to your computer and use it in GitHub Desktop.
Save BenjaminWolfe/0b23c5a59a4f008fee9cc54dd91c91e8 to your computer and use it in GitHub Desktop.
collect_iteratively <- function(x, size = 500, timeout = 3600) {
start_time <- Sys.time()
message("pulling ", size, " records at a time...")
message("starting at ", start_time)
message("will time out at ", start_time + timeout, " (", timeout, "s later)")
con <- x$src$con
sql <- dbplyr::db_sql_render(con, x)
res <- DBI::dbSendQuery(con, sql)
out <- list()
idx <- 0
rws <- NA
tryCatch({
while (Sys.time() <= start_time + timeout) {
idx <- idx + 1
message(
"batch ", idx, ", ",
"starting ", Sys.time(), "; ",
"previous batch had ", rws, " records"
)
this_fetch <- DBI::dbFetch(res, n = size)
out[[idx]] <- dplyr::grouped_df(
this_fetch,
intersect(
dbplyr::op_grps(x),
names(this_fetch)
)
)
rws <- nrow(out[[idx]])
if (nrow(out[[idx]]) < size) break
}
}, finally = {
DBI::dbClearResult(res)
})
dplyr::bind_rows(out)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment