Skip to content

Instantly share code, notes, and snippets.

@AdamSpannbauer
Last active June 11, 2018 13:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AdamSpannbauer/b04c1f6243ce07a5d2e0c9eb78502a55 to your computer and use it in GitHub Desktop.
Save AdamSpannbauer/b04c1f6243ce07a5d2e0c9eb78502a55 to your computer and use it in GitHub Desktop.
a function factory for creating a generator function for chunking large sql queries (written for use with SQL Server)
create_query_chunker = function(query,
dbi_connection,
post_process_func=NULL,
chunk_size=5000L,
debug_print=FALSE) {
#' @title Create a chunked query fetcher
#'
#' @description Creates a generator function that can be repeatedly
#' called to return the next n rows of query (where n = \code{chunk_size})
#'
#' @param query string: the SQL query to run
#' @param dbi_connection connection: result of \code{DBI::dbConnect}
#' @param post_process_func function: an optional function to be run against the query results
#' @param chunk_size int: number of rows the returned function should fetch at a time
#' @param debug_print bool: if \code{TRUE} the query used will be printed to console
#'
#' @details \code{query} should include an ORDER BY statement, but should not include
#' OFFSET-FETCH statements (these will be added automatically)
#'
#' @return a function that can be called to fetch the input query in chunks
#'
#' @examples
#' # define dbi connection
#' connection_string = "my connection string"
#' con = DBI::dbConnect(drv = odbc::odbc(),
#' .connection_string = connection_string)
#'
#' query = "
#' SELECT *
#' FROM my_table
#' ORDER BY my_column"
#'
#' # init a query chunker
#' fetch_next_chunk = create_query_chunker(query,
#' dbi_connection = con,
#' chunk_size = 100L,
#' debug_print=TRUE)
#' fetch_next_chunk() #fetch rows 1-100
#' fetch_next_chunk() #fetch rows 101-200
#'
# append chunking sql bits to users query
chunk_base_query = paste0(query, '\n',
'OFFSET {offset_size} ROWS\n',
'FETCH NEXT {chunk_size} ROWS ONLY')
# init a chunk counter
chunk_i = 0
chunk_size = as.integer(chunk_size)
# create closure to be returned as a chunk fetcher
chunk_generator = function() {
# calc offset for iteration
offset_size = chunk_size * chunk_i
# fill in offset and chunk size
query_i = glue::glue(chunk_base_query)
# print generated query for debug purposes
if (debug_print) cat(glue::glue('\n\r\n\rQUERY:',
'\n-------------\n\r',
'{query_i}',
'\n-------------\n\r\n\r'))
# exec query and store results
query_result = DBI::dbGetQuery(conn = dbi_connection,
statement = query_i)
# apply post processing func if provided
if (is.function(post_process_func)) {
query_result = post_process_func(query_result)
}
# increment chunk counter
chunk_i <<- chunk_i + 1
return(query_result)
}
return(chunk_generator)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment