Skip to content

Instantly share code, notes, and snippets.

@goldingn
Created April 14, 2018 00:50
Show Gist options
  • Save goldingn/d5a3aebfbc63eaadd92f0ff5ca811a5d to your computer and use it in GitHub Desktop.
Save goldingn/d5a3aebfbc63eaadd92f0ff5ca811a5d to your computer and use it in GitHub Desktop.
prototype of parallel progress reporting (for processes on the same file system)
# progress information in parallel processes (that use the same filesystem)
# the master function sets up a tempfile for each process, spawns processes, and
# passes the corresponding tempfile location to each; each process dumps
# progress information into its tempfile; the master function polls those files
# for the progress information and returns it to the screen; the previous line
# is overwritten, as for progress bars
library (future)
# an environment to stash file info in, to hack around scoping issues. A package
# namespace could be used instead, but there's probably a more elegant solution.
mock_namespace <- new.env()
mock_namespace$file <- ""
# users can insert this in their code to send out progress information. Ideally
# this would be replaced with a progress bar.
update_parallel_progress <- function (i, n) {
progress_text <- sprintf("%i%%\n", round(100 * i / n))
cat(progress_text, file = mock_namespace$file)
}
run_job <- function (job_info) {
# use a mock namespace to make the communication file visible to
# update_parallel_progress on this run
mock_namespace$file <- job_info$file
eval(job_info$expression)
}
all_resolved <- function (futures) {
each_resolved <- vapply(futures, resolved, FALSE)
all(each_resolved)
}
# replicate expr in parallel across n_cores processes, and print live progress
# information for all the processes
future_replicate <- function (n, expr, simplify = "array") {
jobs <- seq_len(n)
# create tempfiles for communication and populate them with something
files <- replicate(n, tempfile())
lapply(files,
function(file) {
cat("0%\n", file = file)
})
# dispatch the jobs
futures <- list()
for (job in jobs) {
mock_namespace$file <- files[[job]]
expression <- substitute(expr)
futures[[job]] <- future(eval.parent(expression))
}
# poll the files until all the jobs are complete
while (!all_resolved(futures)) {
# get and print the progress information
job_text <- paste("job", jobs)
progress_text <- vapply(files, readLines, "")
all_text <- paste0(job_text, ": ", progress_text,
collapse = " ")
cat("\r", all_text)
flush.console()
}
cat("\n")
# get the values optionally simplify, and return
results <- lapply(futures, value)
if (!identical(simplify, FALSE) && length(results) > 0) {
results <- simplify2array(results,
higher = (simplify == "array"))
}
results
}
# # demo of a user-defined function, with parallel progress information
#
# library (future)
#
# foo <- function (n) {
# for (i in seq_len(n)) {
# update_parallel_progress(i, n)
# Sys.sleep(runif(1))
# }
# "success!"
# }
#
# plan(multiprocess)
# future_replicate(4, foo(30))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment