Skip to content

Instantly share code, notes, and snippets.

@ClaytonJY
Last active May 23, 2023 15:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ClaytonJY/1d31cac5bfa35d934f9abbe000d1eb18 to your computer and use it in GitHub Desktop.
Save ClaytonJY/1d31cac5bfa35d934f9abbe000d1eb18 to your computer and use it in GitHub Desktop.
Different ways to parallelize a grouped dplyr operation
library(dplyr)
library(purrr)
# helper to make examples
new_tbl <- function(n_groups, row_range = 5:10) {
1:n_groups %>%
map_df(~tibble(
group_id = .x,
value = rnorm(sample.int(row_range, 1L))
))
}
# take the row with biggest value in some column
get_max_slicer <- function(col, time) {
col <- enquo(col)
function(tbl) {
Sys.sleep(time)
tbl %>%
arrange(desc(!!col)) %>%
slice(1L)
}
}
#### functions ####
# sequential
do_sequential <- function(tbl, func) {
tbl %>%
group_by(group_id) %>%
do(func(.))
}
# naive-but-easy parallel
library(future)
plan(multiprocess)
do_small_groups <- function(tbl, func) {
tbl %>%
split(.$group_id) %>%
map(~future(
func(.x)
)) %>%
values() %>%
bind_rows()
}
# smarter, probably
do_big_groups <- function(tbl, func, cores = availableCores()) {
tbl %>%
split(.$group_id %% cores) %>% # relies on group_id being contiguous ints
map(~future(
do_sequential(.x, func)
)) %>%
values() %>%
bind_rows()
}
#### validate ####
tbl <- new_tbl(10L)
slicer <- get_max_slicer(value, 0.1)
funcs <- lst(
do_sequential,
do_small_groups,
do_big_groups
)
results <- map(funcs, ~.x(tbl, slicer))
# compare each result to the one before
map2_lgl(
results[-1], results[-length(results)],
all_equal
) %>%
all() %>%
stopifnot()
#### benchmarking ####
library(microbenchmark)
library(tidyr)
library(ggplot2)
timer <- function(groups, time, functions, repeats = 3L) {
tbl <- new_tbl(groups)
f <- get_max_slicer(value, time)
functions %>%
map_dfr(
~microbenchmark(.x(tbl, f), times = repeats, unit = "s") %>% summary(),
.id = "func"
) %>%
select(-expr)
}
results <- list(
iter_time = c(0.01, 0.1, 1),
n_groups = c(10L, 20L, 40L)
) %>%
cross_df() %>%
mutate(timings = map2(n_groups, iter_time, timer, funcs)) %>%
unnest(timings)
plot_times <- function(tbl) {
tbl %>%
ggplot(aes(x = n_groups, y = mean, color = func)) +
facet_wrap(~iter_time) +
scale_x_continuous(breaks = unique(tbl$n_groups)) + # must be a simpler way?
geom_line() + geom_point()
}
results %>%
mutate(func = factor(func, names(funcs))) %>%
plot_times()
# more time, parallel options only
p_results <- list(
iter_time = c(0.05, 1, 2),
n_groups = c(10L, 100L, 1000L)
) %>%
cross_df() %>%
mutate(timings = map2(n_groups, iter_time, timer, funcs[-1])) %>%
unnest(timings)
p_results %>%
mutate(func = factor(func, names(funcs))) %>%
plot_times()
@ClaytonJY
Copy link
Author

ClaytonJY commented Mar 23, 2018

Results on 4-core i5:

image

Sequential can outperform many-batch parallelism, but only on super low group-counts and per-batch-processing-time; this is because it takes time to dispatch each child worker. Big buckets consistently perform better, and scale better due to no increase in communication/dispatch cost.

image

Further proof big batches are better, though the benefit of decreased communication costs is quickly overwhelmed by larger per-group times. Going to 2s per group would make both approaches even more similar, but scaling up to 10k groups would increase the lead of the batch-of-batches approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment