Skip to content

Instantly share code, notes, and snippets.

@nbenn
Last active December 28, 2018 08:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nbenn/5c7961ee25967cfc7e93fd80275fcb46 to your computer and use it in GitHub Desktop.
Save nbenn/5c7961ee25967cfc7e93fd80275fcb46 to your computer and use it in GitHub Desktop.
Experiments with parallel group-by in data.table
dt_sequential <- function(tbl, group_by, fun, use_cols) {
time_taken <- system.time({
res <- tbl[, fun(.SD), by = group_by, .SDcols = use_cols]
})
list(result = res, timings = time_taken)
}
dt_parallel <- function(tbl, group_by, fun, use_cols, n_cores,
n_chuncks = n_cores) {
func <- function(i, dt) {
dt[GroupIndex == i, fun(.SD), by = group_by, .SDcols = use_cols]
}
tbl <- tbl[, GroupIndex := .GRP, by = group_by]
grp_map <- rep(
seq_len(n_chuncks),
lengths(parallel::splitIndices(max(tbl[["GroupIndex"]]), n_chuncks))
)
tbl <- tbl[, GroupIndex := grp_map[GroupIndex]]
if (n_cores == 1L) {
time_taken <- system.time({
res <- lapply(seq_len(n_chuncks), func, tbl)
})
} else {
time_taken <- system.time({
res <- parallel::mclapply(seq_len(n_chuncks), func, tbl,
mc.cores = n_cores)
})
}
list(result = data.table::rbindlist(res), timings = time_taken)
}
rep_run <- function(n_reps, fun, ...) {
res <- lapply(seq_len(n_reps), function(i, ...) {
time_taken <- system.time({
res <- fun(...)
})
list(result = res, overall = time_taken)
}, ...)
overall <- do.call(rbind, lapply(res, `[[`, "overall"))
res <- lapply(res, `[[`, "result")
timings <- do.call(rbind, lapply(res, `[[`, "timings"))
timings <- cbind(timings[, 1:3], overall[, 1:3])
timings <- cbind(mean = apply(timings, 2L, mean),
var = apply(timings, 2L, var))
rownames(timings) <- paste0(rep(c("user (", "system (", "elapsed ("), 2),
rep(c("focus)", "total)"), each = 3))
print(timings)
res[[1L]][["result"]]
}
new_dt <- function(n = 3e6) {
data.table::data.table(
a = rep(LETTERS, each = n),
b = rep(letters, times = n),
c = runif(length(letters) * n, 0, 1),
d = runif(length(letters) * n, 1, 2),
e = runif(length(letters) * n, 1, 3),
f = rnorm(length(letters) * n, 0, 1),
g = rnorm(length(letters) * n, 1, 2),
h = rnorm(length(letters) * n, 2, 1)
)
}
col_var <- function(x) lapply(x, function(y) {
# slow this down a bit
res <- 0
y_bar <- 0
for (i in y) y_bar <- y_bar + i
y_bar <- y_bar / length(y)
for (i in y) res <- res + (i - y_bar) ^ 2
res / length(y)
})
plus_one <- function(x) lapply(x, function(y) y + 1)
grp_cols <- letters[1:2]
do_cols <- letters[3:8]
n_reps <- 20L
tbl <- new_dt()
memuse::mu(tbl)
res_1 <- rep_run(n_reps, dt_sequential, tbl, grp_cols, col_var, do_cols)
res_2 <- rep_run(n_reps, dt_parallel, tbl, grp_cols, col_var, do_cols, 1L, 2L)
res_3 <- rep_run(n_reps, dt_parallel, tbl, grp_cols, col_var, do_cols, 2L, 2L)
res_4 <- dt_parallel(tbl, grp_cols, plus_one, do_cols, 2L, 2L)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment