Skip to content

Instantly share code, notes, and snippets.

@mskyttner
Created February 14, 2021 19:46
Show Gist options
  • Save mskyttner/5140bcd074b844263de5c3a2bef4bded to your computer and use it in GitHub Desktop.
Save mskyttner/5140bcd074b844263de5c3a2bef4bded to your computer and use it in GitHub Desktop.
Load .tsv file into duckdb in chunkwise fashiong using R and vroom
library(dplyr)
library(duckdb)
library(vroom)
duckdb_version <- function() {
con <- duckdb::dbConnect(duckdb::duckdb())
on.exit(duckdb::dbDisconnect(con, shutdown = TRUE))
res <- DBI::dbGetQuery(con, "PRAGMA version;")
parse_semver <- function(x) {
re <- "(\\d)+\\.(\\d+)\\.(\\d+).*$"
as.integer(unlist(regmatches(x, regexec(re, x)))[1 + (1:3)])
}
parse_semver(res)
}
guess_colspec_concise <- function(file, n_lookahead = 100) {
message("Column specifications guess from vroom for ", file)
head <- vroom::vroom(file, n_max = n_lookahead)
spec <- vroom::spec(head)
condensed <- vroom::cols_condense(spec)
cols_concise <- function(colspec)
paste(collapse = "", gsub(pattern = "^collector_(.{1}).*$", "\\1",
vapply(colspec, class, character(2))[1,]))
list(
fields = names(spec$cols),
coltypes = spec,
concise = cols_concise(spec$cols),
condensed = cols_concise(condensed$cols),
default = cols_concise(list(.default = condensed$default)),
delim = condensed$delim
)
}
read_vroom_chunks <- function(file, n_chunksize = 5e5, chunk_callback_fn,
colspec = guess_colspec_concise(file),
encoding = Sys.getenv("encoding", "UTF-8"), ...) {
format_bytes <- function(x)
utils:::format.object_size(as.integer(x), "auto")
i <- 0
is_done <- FALSE
chunk <- data.frame()
stats <- data.frame()
while (!is_done) {
t0 <- Sys.time()
beg <- 1 + i * n_chunksize
chunk <- vroom::vroom(
file = file, delim = colspec$delim,
col_names = colspec$fields, col_types = colspec$coltypes$col,
skip = beg, n_max = n_chunksize,
...
)
# stats for the chunk
i <- i + 1
n <- nrow(chunk)
is_done <- n < n_chunksize
mem <- pryr::object_size(chunk)
mf <- as.numeric(system("awk '/MemFree/ {print $2}' /proc/meminfo", intern = TRUE))
mu <- pryr::mem_used()
fd <- system("df -h | grep '/$' | awk '{print $4}'", intern = TRUE)
t1 <- Sys.time()
t_read <- t1 - t0
# process the chunk
if (n > 0) chunk_callback_fn(chunk)
t_write <- Sys.time() - t1
# update stats
statz <- tibble::tibble(file,
chunk = i, beg = beg, end = beg + n_chunksize,
is_done, nrow = n, t_read, t_write,
mem_chunk = format_bytes(mem), mem_used = format_bytes(mu), free_disk = fd)
print(statz)
stats <- rbind.data.frame(stats, statz)
if (is_done) {
message(sprintf("Total lines read excl header: %s", (beg + n - 1)))
message(sprintf("Read times: %s total, %s avg", sum(stats$t_read), mean(stats$t_read)))
message(sprintf("Write times: %s total, %s avg", sum(stats$t_write), mean(stats$t_write)))
}
Sys.sleep(0.001)
}
return(stats)
}
duck_copy_csv <- function(csvfile, dbfile, tablename, overwrite = FALSE,
csv_colspec = guess_colspec_concise(csvfile, n_lookahead = csv_lookahead),
csv_lookahead = 1000L, csv_chunksize = 5e5, duckdb_mem_limit = 8L, ...) {
file <- normalizePath(csvfile)
db <- normalizePath(dbfile)
con <- DBI::dbConnect(duckdb::duckdb(dbdir = db))
on.exit(duckdb::dbDisconnect(con, shutdown = TRUE))
stopifnot(all(c(file.exists(file), file.exists(db))))
has_checkpointing <- all(duckdb_version() >= c(0, 2, 5))
if (!has_checkpointing)
message("Checkpointing needs > v 0.2.4 of duckdb",
"; falling back to using per-chunk reconnections to flush WAL incrementally")
if (has_checkpointing) {
DBI::dbExecute(con, sprintf("PRAGMA checkpoint_threshold='%sGB';", duckdb_mem_limit))
}
res <- DBI::dbExecute(con, sprintf("PRAGMA memory_limit='%sGB'", duckdb_mem_limit))
if (duckdb::dbExistsTable(con, tablename) && !overwrite)
stop("Table ", tablename, " exists in ", db, " , please use `overwrite = T`")
if (duckdb::dbExistsTable(con, tablename))
duckdb::dbRemoveTable(con, tablename)
select_into <- function(data, tablename, checkpoint = TRUE) {
con_chunk <- con
if (checkpoint && !has_checkpointing) {
# fallback to connect/disconnect for each chunk
con_chunk <- DBI::dbConnect(duckdb::duckdb(dbdir = db))
on.exit(duckdb::dbDisconnect(con_chunk, shutdown = TRUE))
}
# chunk fits in RAM -> use temp table and append data into tablename
duckdb::duckdb_register(con_chunk, "chunk", data)
sql <- sprintf("INSERT INTO %s SELECT * FROM chunk;", tablename)
if (!DBI::dbExistsTable(con_chunk, tablename))
sql <- sprintf("CREATE TABLE %s AS SELECT * FROM chunk;", tablename)
res <- DBI::dbExecute(con_chunk, sql)
duckdb::duckdb_unregister(con_chunk, "chunk")
if (checkpoint && has_checkpointing) {
res <- DBI::dbExecute(con_chunk, "checkpoint;")
}
}
chunk_stats <- read_vroom_chunks(file, n_chunksize = csv_chunksize, colspec = csv_colspec,
chunk_callback_fn = function(x) select_into(x, tablename, ...)
)
if (!is_ok_transaction(db, tablename, csvfile)) {
message("Bulk load failed, pls remove table ", tablename, " from ", db,
"and try again :)")
return(chunk_stats)
}
return(chunk_stats)
}
# chunk_stats <-
# duck_copy_csv(
# csvfile = "data-raw/ark/hcaf_species_native.tsv",
# dbfile = "duckdb_database",
# tablename = "hcaf_species_native", overwrite = T,
# checkpoint = TRUE, csv_chunksize = 5e6
# )
#
# library(ggplot2)
#
# visual <-
# chunk_stats %>%
# select(chunk, end, t_read, t_write, free_disk, mem_chunk) %>%
# mutate_at(vars(starts_with("t_")), as.numeric) %>%
# mutate(krps_read = 1e-3 * end / cumsum(t_read)) %>%
# mutate(krps_write = 1e-3 * end / cumsum(t_write)) %>%
# mutate(disk = readr::parse_number(chartr("G,", " .", free_disk))) %>%
# mutate(disk_used = max(disk) - disk) %>%
# mutate(chunk_mem = as.numeric(gsub(" Mb$", "", mem_chunk))) %>%
# select(chunk, t_read, t_write, krps_read, krps_write, disk_used, chunk_mem) %>%
# tidyr::pivot_longer(ts, cols = -chunk, names_to = "variable", values_to = "value")
#
# p1 <-
# ggplot(visual, aes(x = chunk, y = value)) +
# geom_step(aes(color = variable)) +
# #geom_line(aes(color = variable), size = 1) +
# scale_color_brewer(palette = "Set2", guide = FALSE) +
# theme_minimal() +
# facet_wrap(~variable, scales = "free", ncol = 2) +
# theme(axis.title = element_blank())
#
# ggsave("~/duck_csv_01.png", p1)
#
# knitr::kable(chunk_stats %>% select(-file))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment