Created
April 17, 2019 18:52
-
-
Save jdnewmil/008a15162bebac9d64d46b175b5d9813 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# future_patch.R | |
mymakeNodePSOCK <- function( worker = "localhost" | |
, master = NULL | |
, port | |
, connectTimeout = getOption( "future.makeNodePSOCK.connectTimeout" | |
, as.numeric( Sys.getenv( "R_FUTURE_MAKENODEPSOCK_CONNECTTIMEOUT" | |
, 2 * 60 | |
) | |
) | |
) | |
, timeout = getOption( "future.makeNodePSOCK.timeout" | |
, as.numeric( Sys.getenv( "R_FUTURE_MAKENODEPSOCK_TIMEOUT" | |
, 30 * 24 * 60 * 60 | |
) | |
) | |
) | |
, rscript = NULL | |
, homogeneous = NULL | |
, rscript_args = NULL | |
, methods = TRUE | |
, useXDR = TRUE | |
, outfile = "/dev/null" | |
, renice = NA_integer_ | |
, rshcmd = getOption( "future.makeNodePSOCK.rshcmd" | |
, Sys.getenv( "R_FUTURE_MAKENODEPSOCK_RSHCMD" ) | |
) | |
, user = NULL | |
, revtunnel = TRUE | |
, rshlogfile = NULL | |
, rshopts = getOption( "future.makeNodePSOCK.rshopts" | |
, Sys.getenv( "R_FUTURE_MAKENODEPSOCK_RSHOPTS" ) | |
) | |
, rank = 1L | |
, manual = FALSE | |
, dryrun = FALSE | |
, verbose = FALSE | |
, workershell = "sh" | |
, mastershell = "cmd" | |
) | |
{ | |
localMachine <- is.element(worker, c("localhost", "127.0.0.1")) | |
if (!localMachine) { | |
localMachine <- future:::is_localhost(worker) | |
if (localMachine) | |
worker <- "localhost" | |
} | |
attr(worker, "localhost") <- localMachine | |
manual <- as.logical(manual) | |
future:::stop_if_not(length(manual) == 1L, !is.na(manual)) | |
dryrun <- as.logical(dryrun) | |
future:::stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) | |
if (identical(rshcmd, "")) | |
rshcmd <- NULL | |
if (!is.null(rshcmd)) { | |
rshcmd <- as.character(rshcmd) | |
future:::stop_if_not(length(rshcmd) >= 1L) | |
} | |
if (identical(rshopts, "")) | |
rshopts <- NULL | |
rshopts <- as.character(rshopts) | |
user <- as.character(user) | |
future:::stop_if_not(length(user) <= 1L) | |
port <- as.integer(port) | |
if (is.na(port) || port < 0L || port > 65535L) { | |
stop("Invalid port: ", port) | |
} | |
revtunnel <- as.logical(revtunnel) | |
future:::stop_if_not(length(revtunnel) == 1L, !is.na(revtunnel)) | |
if (!is.null(rshlogfile)) { | |
if (is.logical(rshlogfile)) { | |
future:::stop_if_not(!is.na(rshlogfile)) | |
if (rshlogfile) { | |
rshlogfile <- tempfile(pattern = "future_makeClusterPSOCK_", | |
fileext = ".log") | |
} | |
else { | |
rshlogfile <- NULL | |
} | |
} | |
else { | |
rshlogfile <- as.character(rshlogfile) | |
#rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE) | |
} | |
} | |
if (is.null(master)) { | |
if (localMachine || revtunnel) { | |
master <- "localhost" | |
} | |
else { | |
master <- Sys.info()[["nodename"]] | |
} | |
} | |
future:::stop_if_not(!is.null(master)) | |
timeout <- as.numeric(timeout) | |
future:::stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), | |
timeout >= 0) | |
methods <- as.logical(methods) | |
future:::stop_if_not(length(methods) == 1L, !is.na(methods)) | |
if (is.null(homogeneous)) { | |
homogeneous <- { | |
localMachine || (!revtunnel && future:::is_localhost(master)) || | |
(!future:::is_ip_number(worker) && !future:::is_fqdn(worker)) | |
} | |
} | |
homogeneous <- as.logical(homogeneous) | |
future:::stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) | |
if (is.null(rscript)) { | |
rscript <- "Rscript" | |
if (homogeneous) | |
rscript <- file.path(R.home("bin"), rscript) | |
} | |
else { | |
rscript <- as.character(rscript) | |
future:::stop_if_not(length(rscript) >= 1L) | |
bin <- Sys.which(rscript[1]) | |
if (bin == "") | |
bin <- rscript[ 1 ] | |
# bin <- normalizePath(rscript[1], mustWork = FALSE) | |
rscript[1] <- bin | |
} | |
rscript_args <- as.character(rscript_args) | |
useXDR <- as.logical(useXDR) | |
future:::stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) | |
future:::stop_if_not(is.null(outfile) || is.character(outfile)) | |
renice <- as.integer(renice) | |
future:::stop_if_not(length(renice) == 1L) | |
rank <- as.integer(rank) | |
future:::stop_if_not(length(rank) == 1L, !is.na(rank)) | |
verbose <- as.logical(verbose) | |
future:::stop_if_not(length(verbose) == 1L, !is.na(verbose)) | |
verbose_prefix <- "[local output] " | |
if (!any(grepl("parallel:::.slaveRSOCK()", rscript_args, | |
fixed = TRUE))) { | |
rscript_args <- c(rscript_args, "-e", shQuote("parallel:::.slaveRSOCK()", workershell ) ) | |
} | |
pidfile <- NULL | |
if (localMachine && !dryrun) { | |
autoKill <- isTRUE(getOption("future.makeNodePSOCK.autoKill", | |
as.logical(Sys.getenv("R_FUTURE_MAKENODEPSOCK_AUTOKILL", | |
TRUE)))) | |
if (autoKill) { | |
pidfile <- tempfile(pattern = sprintf("future.parent=%d.", | |
Sys.getpid()), fileext = ".pid") | |
#pidfile <- normalizePath(pidfile, winslash = "/", mustWork = FALSE) | |
pidcode <- sprintf("try(cat(Sys.getpid(),file=\"%s\"), silent = TRUE)", | |
pidfile) | |
rscript_pid_args <- c("-e", shQuote(pidcode,mastershell) ) | |
test_cmd <- paste(c(rscript, rscript_pid_args, "-e", | |
shQuote(sprintf("file.exists(%s)", shQuote(pidfile,"sh")) | |
, workershell)), | |
collapse = " ") | |
if (verbose) { | |
message("Testing if worker's PID can be inferred: ", | |
sQuote(test_cmd)) | |
} | |
input <- NULL | |
if (any(grepl("singularity", rscript, ignore.case = TRUE))) | |
input <- "" | |
res <- system(test_cmd, intern = TRUE, input = input) | |
status <- attr(res, "status") | |
suppressWarnings(file.remove(pidfile)) | |
if ((is.null(status) || status == 0L) && any(grepl("TRUE", | |
res))) { | |
if (verbose) | |
message("- Possible to infer worker's PID: TRUE") | |
rscript_args <- c(rscript_pid_args, rscript_args) | |
} | |
else { | |
if (verbose) | |
message("- Possible to infer worker's PID: FALSE") | |
pidfile <- NULL | |
} | |
} | |
} | |
rscript_label <- getOption("future.makeNodePSOCK.rscript_label", | |
Sys.getenv("R_FUTURE_MAKENODEPSOCK_RSCRIPT_LABEL")) | |
if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { | |
if (isTRUE(as.logical(rscript_label))) { | |
script <- grep("[.]R$", commandArgs(), value = TRUE)[1] | |
if (is.na(script)) | |
script <- "UNKNOWN" | |
rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), | |
Sys.info()[["nodename"]], Sys.info()[["user"]]) | |
} | |
rscript_args <- c("-e", shQuote(paste0("#label=", rscript_label),mastershell), | |
rscript_args) | |
} | |
if (methods) { | |
rscript_args <- c("--default-packages=datasets,utils,grDevices,graphics,stats,methods", | |
rscript_args) | |
} | |
if (!localMachine && revtunnel) { | |
rscript_port <- port + (rank - 1L) | |
} | |
else { | |
rscript_port <- port | |
} | |
rscript <- paste(shQuote(rscript,workershell), collapse = " ") | |
rscript_args <- paste(rscript_args, collapse = " ") | |
envvars <- paste0("MASTER=", master, " PORT=", rscript_port, | |
" OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR) | |
cmd <- paste(rscript, rscript_args, envvars ) | |
if (!is.na(renice) && renice > 0L) { | |
cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) | |
} | |
if (!localMachine) { | |
find <- is.null(rshcmd) | |
if (find) { | |
which <- NULL | |
if (verbose) { | |
message(sprintf("%sWill search for all 'rshcmd' available\n", | |
verbose_prefix)) | |
} | |
} | |
else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { | |
find <- TRUE | |
if (verbose) { | |
message(sprintf("%sWill search for specified 'rshcmd' types: %s\n", | |
verbose_prefix, paste(sQuote(rshcmd), collapse = ", "))) | |
} | |
which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) | |
} | |
if (find) { | |
rshcmd <- find_rshcmd(which = which, must_work = !localMachine && | |
!manual && !dryrun) | |
if (verbose) { | |
s <- unlist(lapply(rshcmd, FUN = function(r) { | |
sprintf("%s [type=%s, version=%s]", paste(sQuote(r), | |
collapse = ", "), sQuote(attr(r, "type")), | |
sQuote(attr(r, "version"))) | |
})) | |
s <- paste(sprintf("%s %d. %s", verbose_prefix, | |
seq_along(s), s), collapse = "\n") | |
message(sprintf("%sFound the following available 'rshcmd':\n%s", | |
verbose_prefix, s)) | |
} | |
rshcmd <- rshcmd[[1]] | |
} | |
else { | |
if (is.null(attr(rshcmd, "type"))) | |
attr(rshcmd, "type") <- "<unknown>" | |
if (is.null(attr(rshcmd, "version"))) | |
attr(rshcmd, "version") <- "<unknown>" | |
} | |
s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, | |
"type")), sQuote(attr(rshcmd, "version"))) | |
rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), | |
collapse = ", "), s) | |
if (verbose) | |
message(sprintf("%sUsing 'rshcmd': %s", verbose_prefix, | |
rshcmd_label)) | |
if (length(user) == 1L) | |
rshopts <- c("-l", user, rshopts) | |
if (revtunnel) { | |
rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, | |
master, port), rshopts) | |
if (isTRUE(attr(rshcmd, "OpenSSH_for_Windows"))) { | |
ver <- windows_build_version() | |
if (!is.null(ver) && ver <= "10.0.17763.253") { | |
msg <- sprintf("WARNING: You're running Windows 10 (build %s) where this 'rshcmd' (%s) may not support reverse tunneling (revtunnel = TRUE) resulting in worker failing to launch", | |
ver, paste(sQuote(rshcmd), collapse = ", "), | |
rshcmd_label) | |
if (verbose) | |
message(c(verbose_prefix, msg)) | |
} | |
} | |
} | |
if (is.character(rshlogfile)) { | |
rshlogflag <- if ( grepl( "[Pp][Ll][Ii][Nn][Kk]", rshcmd[1] ) ) { | |
"-sshlog" | |
} else { | |
"-E" | |
} | |
rshopts <- c(sprintf("%s %s", rshlogflag, shQuote(rshlogfile,mastershell)), | |
rshopts) | |
} | |
rshopts <- paste(rshopts, collapse = " ") | |
rsh_call <- paste(paste(shQuote(rshcmd,mastershell), collapse = " "), | |
rshopts, worker) | |
local_cmd <- paste(rsh_call, shQuote(cmd, mastershell ) ) | |
} | |
else { | |
local_cmd <- cmd | |
} | |
future:::stop_if_not(length(local_cmd) == 1L) | |
is_worker_output_visible <- is.null(outfile) | |
if (manual || dryrun) { | |
msg <- c("----------------------------------------------------------------------") | |
if (localMachine) { | |
msg <- c(msg, sprintf("Manually, start worker #%s on local machine %s with:", | |
rank, sQuote(worker)), sprintf("\n %s\n", cmd)) | |
} | |
else { | |
msg <- c(msg, sprintf("Manually, (i) login into external machine %s:", | |
sQuote(worker)), sprintf("\n %s\n", rsh_call)) | |
msg <- c(msg, sprintf("and (ii) start worker #%s from there:", | |
rank), sprintf("\n %s\n", cmd)) | |
msg <- c(msg, sprintf("Alternatively, start worker #%s from the local machine by combining both step in a single call:", | |
rank), sprintf("\n %s\n", local_cmd)) | |
} | |
msg <- paste(c(msg, ""), collapse = "\n") | |
cat(msg) | |
utils::flush.console() | |
if (dryrun) | |
return(NULL) | |
} | |
else { | |
if (verbose) { | |
message(sprintf("%sStarting worker #%s on %s: %s", | |
verbose_prefix, rank, sQuote(worker), local_cmd)) | |
} | |
input <- if (.Platform$OS.type == "windows") | |
"" | |
else NULL | |
res <- system(local_cmd, wait = FALSE, input = input) | |
if (verbose) { | |
message(sprintf("%s- Exit code of system() call: %s", | |
verbose_prefix, res)) | |
} | |
if (res != 0) { | |
warning(sprintf("system(%s) had a non-zero exit code: %d", | |
local_cmd, res)) | |
} | |
} | |
if (verbose) { | |
message(sprintf("%sWaiting for worker #%s on %s to connect back", | |
verbose_prefix, rank, sQuote(worker))) | |
if (is_worker_output_visible) { | |
if (.Platform$OS.type == "windows") { | |
message(sprintf("%s- Detected 'outfile=NULL' on Windows: this will make the output from the background worker visible when running R from a terminal, but it will most likely not be visible when using a GUI.", | |
verbose_prefix)) | |
} | |
else { | |
message(sprintf("%s- Detected 'outfile=NULL': this will make the output from the background worker visible", | |
verbose_prefix)) | |
} | |
} | |
} | |
con <- local({ | |
setTimeLimit(elapsed = connectTimeout) | |
on.exit(setTimeLimit(elapsed = Inf)) | |
warnings <- list() | |
tryCatch({ | |
withCallingHandlers({ | |
socketConnection("localhost", port = port, server = TRUE, | |
blocking = TRUE, open = "a+b", timeout = timeout) | |
}, warning = function(w) { | |
if (verbose) { | |
message(sprintf("%sDetected a warning from socketConnection(): %s", | |
verbose_prefix, sQuote(conditionMessage(w)))) | |
} | |
warnings <<- c(warnings, list(w)) | |
}) | |
}, error = function(ex) { | |
setTimeLimit(elapsed = Inf) | |
machineType <- if (localMachine) | |
"local" | |
else "remote" | |
msg <- sprintf("Failed to launch and connect to R worker on %s machine %s from local machine %s.\n", | |
machineType, sQuote(worker), sQuote(Sys.info()[["nodename"]])) | |
cmsg <- conditionMessage(ex) | |
if (grepl(gettext("reached elapsed time limit"), | |
cmsg)) { | |
msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s (which suggests that the connection timeout of %.0f seconds (argument 'connectTimeout') kicked in)\n", | |
sQuote(cmsg), connectTimeout)) | |
} | |
else { | |
msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s\n", | |
sQuote(cmsg))) | |
} | |
if (length(warnings) > 0) { | |
msg <- c(msg, sprintf(" * In addition, socketConnection() produced %d warning(s):\n", | |
length(warnings))) | |
for (kk in seq_along(warnings)) { | |
cmsg <- conditionMessage(warnings[[kk]]) | |
if (grepl("port [0-9]+ cannot be opened", cmsg)) { | |
msg <- c(msg, sprintf(" - Warning #%d: %s (which suggests that this port is either already occupied by another process or blocked by the firewall on your local machine)\n", | |
kk, sQuote(cmsg))) | |
} | |
else { | |
msg <- c(msg, sprintf(" - Warning #%d: %s\n", | |
kk, sQuote(cmsg))) | |
} | |
} | |
} | |
msg <- c(msg, sprintf(" * The localhost socket connection that failed to connect to the R worker used port %d using a communication timeout of %.0f seconds and a connection timeout of %.0f seconds.\n", | |
port, timeout, connectTimeout)) | |
msg <- c(msg, sprintf(" * Worker launch call: %s.\n", | |
local_cmd)) | |
pid <- future:::readWorkerPID(pidfile) | |
if (!is.null(pid)) { | |
if (verbose) | |
message(sprintf("Killing worker process (PID %d) if still alive", | |
pid)) | |
success <- pid_kill(pid) | |
if (verbose) | |
message(sprintf("Worker (PID %d) was successfully killed: %s", | |
pid, success)) | |
msg <- c(msg, sprintf(" * Worker (PID %d) was successfully killed: %s\n", | |
pid, success)) | |
} | |
else if (localMachine) { | |
msg <- c(msg, sprintf(" * Failed to kill local worker because it's PID is could not be identified.\n")) | |
} | |
suggestions <- NULL | |
if (!verbose) { | |
suggestions <- c(suggestions, "Set 'verbose=TRUE' to see more details.") | |
} | |
if (.Platform$OS.type == "windows") { | |
if (is_worker_output_visible) { | |
suggestions <- c(suggestions, "On Windows, to see output from worker, set 'outfile=NULL' and run R from a terminal (not a GUI).") | |
} | |
else { | |
suggestions <- c(suggestions, "On Windows, output from worker when using 'outfile=NULL' is only visible when running R from a terminal (not a GUI).") | |
} | |
} | |
else { | |
if (!is_worker_output_visible) { | |
suggestions <- c(suggestions, "Set 'outfile=NULL' to see output from worker.") | |
} | |
} | |
if (is.character(rshlogfile)) { | |
smsg <- sprintf("Inspect the content of log file %s for %s.", | |
sQuote(rshlogfile), sQuote(rshcmd)) | |
lmsg <- tryCatch(readLines(rshlogfile, n = 15L, | |
warn = FALSE), error = function(ex) NULL) | |
if (length(lmsg) > 0) { | |
lmsg <- sprintf(" %2d: %s", seq_along(lmsg), | |
lmsg) | |
smsg <- sprintf("%s The first %d lines are:\n%s", | |
smsg, length(lmsg), paste(lmsg, collapse = "\n")) | |
} | |
suggestions <- c(suggestions, smsg) | |
} | |
else { | |
suggestions <- c(suggestions, sprintf("Set 'rshlogfile=TRUE' to enable logging for %s.", | |
sQuote(rshcmd))) | |
} | |
if (!localMachine && revtunnel && isTRUE(attr(rshcmd, | |
"OpenSSH_for_Windows"))) { | |
suggestions <- c(suggestions, sprintf("The 'rshcmd' (%s) used may not support reverse tunneling (revtunnel = TRUE). See ?future::makeClusterPSOCK for alternatives.\n", | |
rshcmd_label)) | |
} | |
if (length(suggestions) > 0) { | |
suggestions <- sprintf(" - Suggestion #%d: %s\n", | |
seq_along(suggestions), suggestions) | |
msg <- c(msg, " * Troubleshooting suggestions:\n", | |
suggestions) | |
} | |
msg <- paste(msg, collapse = "") | |
ex$message <- msg | |
local({ | |
oopts <- options(warning.length = 2000L) | |
on.exit(options(oopts)) | |
stop(ex) | |
}) | |
}) | |
}) | |
setTimeLimit(elapsed = Inf) | |
if (verbose) { | |
message(sprintf("%sConnection with worker #%s on %s established", | |
verbose_prefix, rank, sQuote(worker))) | |
} | |
structure(list(con = con, host = worker, rank = rank, rshlogfile = rshlogfile), | |
class = if (useXDR) | |
"SOCKnode" | |
else "SOCK0node") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# test1.R | |
library(future.apply) | |
library(future) | |
source( "future_patch.R" ) | |
mnode <- makeClusterPSOCK( workers = "osl5405" | |
, homogeneous = FALSE | |
, rscript = "/home/verit.dnv.com/jnewm/bin/Rscript" | |
, user = "jnewm" | |
, makeNode = mymakeNodePSOCK | |
) | |
plan( cluster, workers = mnode ) | |
a %<-% { | |
plan( list( tweak( cluster | |
, workers = c( "osl5411" | |
, "osl5412" | |
) | |
) | |
, multiprocess | |
) | |
) | |
a1 %<-% { | |
a11 %<-% { | |
Sys.getpid() | |
} | |
a12 %<-% { | |
Sys.getpid() | |
} | |
list( node = system( "hostname", TRUE ) | |
, mproc = Sys.getpid() | |
, proc1 = a11 | |
, proc2 = a12 | |
) | |
} | |
a2 %<-% { | |
a21 %<-% { | |
Sys.getpid() | |
} | |
a22 %<-% { | |
Sys.getpid() | |
} | |
list( node = system( "hostname", TRUE ) | |
, mproc = Sys.getpid() | |
, proc1 = a21 | |
, proc2 = a22 | |
) | |
} | |
list( master = system( "hostname", TRUE ) | |
, slave1 = a1 | |
, slave2 = a2 | |
) | |
} | |
a | |
parallel::stopCluster(mnode) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment