Skip to content

Instantly share code, notes, and snippets.

@jdnewmil
Created April 17, 2019 18:29
Show Gist options
  • Save jdnewmil/f33d1f061537d27c95368810fa81d114 to your computer and use it in GitHub Desktop.
Save jdnewmil/f33d1f061537d27c95368810fa81d114 to your computer and use it in GitHub Desktop.
Separation of quoting syntax
# future_patch.R
mymakeNodePSOCK <- function( worker = "localhost"
, master = NULL
, port
, connectTimeout = getOption( "future.makeNodePSOCK.connectTimeout"
, as.numeric( Sys.getenv( "R_FUTURE_MAKENODEPSOCK_CONNECTTIMEOUT"
, 2 * 60
)
)
)
, timeout = getOption( "future.makeNodePSOCK.timeout"
, as.numeric( Sys.getenv( "R_FUTURE_MAKENODEPSOCK_TIMEOUT"
, 30 * 24 * 60 * 60
)
)
)
, rscript = NULL
, homogeneous = NULL
, rscript_args = NULL
, methods = TRUE
, useXDR = TRUE
, outfile = "/dev/null"
, renice = NA_integer_
, rshcmd = getOption( "future.makeNodePSOCK.rshcmd"
, Sys.getenv( "R_FUTURE_MAKENODEPSOCK_RSHCMD" )
)
, user = NULL
, revtunnel = TRUE
, rshlogfile = NULL
, rshopts = getOption( "future.makeNodePSOCK.rshopts"
, Sys.getenv( "R_FUTURE_MAKENODEPSOCK_RSHOPTS" )
)
, rank = 1L
, manual = FALSE
, dryrun = FALSE
, verbose = FALSE
, workershell = "sh"
, mastershell = "cmd"
)
{
localMachine <- is.element(worker, c("localhost", "127.0.0.1"))
if (!localMachine) {
localMachine <- future:::is_localhost(worker)
if (localMachine)
worker <- "localhost"
}
attr(worker, "localhost") <- localMachine
manual <- as.logical(manual)
future:::stop_if_not(length(manual) == 1L, !is.na(manual))
dryrun <- as.logical(dryrun)
future:::stop_if_not(length(dryrun) == 1L, !is.na(dryrun))
if (identical(rshcmd, ""))
rshcmd <- NULL
if (!is.null(rshcmd)) {
rshcmd <- as.character(rshcmd)
future:::stop_if_not(length(rshcmd) >= 1L)
}
if (identical(rshopts, ""))
rshopts <- NULL
rshopts <- as.character(rshopts)
user <- as.character(user)
future:::stop_if_not(length(user) <= 1L)
port <- as.integer(port)
if (is.na(port) || port < 0L || port > 65535L) {
stop("Invalid port: ", port)
}
revtunnel <- as.logical(revtunnel)
future:::stop_if_not(length(revtunnel) == 1L, !is.na(revtunnel))
if (!is.null(rshlogfile)) {
if (is.logical(rshlogfile)) {
future:::stop_if_not(!is.na(rshlogfile))
if (rshlogfile) {
rshlogfile <- tempfile(pattern = "future_makeClusterPSOCK_",
fileext = ".log")
}
else {
rshlogfile <- NULL
}
}
else {
rshlogfile <- as.character(rshlogfile)
#rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE)
}
}
if (is.null(master)) {
if (localMachine || revtunnel) {
master <- "localhost"
}
else {
master <- Sys.info()[["nodename"]]
}
}
future:::stop_if_not(!is.null(master))
timeout <- as.numeric(timeout)
future:::stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout),
timeout >= 0)
methods <- as.logical(methods)
future:::stop_if_not(length(methods) == 1L, !is.na(methods))
if (is.null(homogeneous)) {
homogeneous <- {
localMachine || (!revtunnel && future:::is_localhost(master)) ||
(!future:::is_ip_number(worker) && !future:::is_fqdn(worker))
}
}
homogeneous <- as.logical(homogeneous)
future:::stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous))
if (is.null(rscript)) {
rscript <- "Rscript"
if (homogeneous)
rscript <- file.path(R.home("bin"), rscript)
}
else {
rscript <- as.character(rscript)
future:::stop_if_not(length(rscript) >= 1L)
bin <- Sys.which(rscript[1])
if (bin == "")
bin <- rscript[ 1 ]
# bin <- normalizePath(rscript[1], mustWork = FALSE)
rscript[1] <- bin
}
rscript_args <- as.character(rscript_args)
useXDR <- as.logical(useXDR)
future:::stop_if_not(length(useXDR) == 1L, !is.na(useXDR))
future:::stop_if_not(is.null(outfile) || is.character(outfile))
renice <- as.integer(renice)
future:::stop_if_not(length(renice) == 1L)
rank <- as.integer(rank)
future:::stop_if_not(length(rank) == 1L, !is.na(rank))
verbose <- as.logical(verbose)
future:::stop_if_not(length(verbose) == 1L, !is.na(verbose))
verbose_prefix <- "[local output] "
if (!any(grepl("parallel:::.slaveRSOCK()", rscript_args,
fixed = TRUE))) {
rscript_args <- c(rscript_args, "-e", shQuote("parallel:::.slaveRSOCK()", workershell ) )
}
pidfile <- NULL
if (localMachine && !dryrun) {
autoKill <- isTRUE(getOption("future.makeNodePSOCK.autoKill",
as.logical(Sys.getenv("R_FUTURE_MAKENODEPSOCK_AUTOKILL",
TRUE))))
if (autoKill) {
pidfile <- tempfile(pattern = sprintf("future.parent=%d.",
Sys.getpid()), fileext = ".pid")
#pidfile <- normalizePath(pidfile, winslash = "/", mustWork = FALSE)
pidcode <- sprintf("try(cat(Sys.getpid(),file=\"%s\"), silent = TRUE)",
pidfile)
rscript_pid_args <- c("-e", shQuote(pidcode,mastershell) )
test_cmd <- paste(c(rscript, rscript_pid_args, "-e",
shQuote(sprintf("file.exists(%s)", shQuote(pidfile,"sh"))
, workershell)),
collapse = " ")
if (verbose) {
message("Testing if worker's PID can be inferred: ",
sQuote(test_cmd))
}
input <- NULL
if (any(grepl("singularity", rscript, ignore.case = TRUE)))
input <- ""
res <- system(test_cmd, intern = TRUE, input = input)
status <- attr(res, "status")
suppressWarnings(file.remove(pidfile))
if ((is.null(status) || status == 0L) && any(grepl("TRUE",
res))) {
if (verbose)
message("- Possible to infer worker's PID: TRUE")
rscript_args <- c(rscript_pid_args, rscript_args)
}
else {
if (verbose)
message("- Possible to infer worker's PID: FALSE")
pidfile <- NULL
}
}
}
rscript_label <- getOption("future.makeNodePSOCK.rscript_label",
Sys.getenv("R_FUTURE_MAKENODEPSOCK_RSCRIPT_LABEL"))
if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) {
if (isTRUE(as.logical(rscript_label))) {
script <- grep("[.]R$", commandArgs(), value = TRUE)[1]
if (is.na(script))
script <- "UNKNOWN"
rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(),
Sys.info()[["nodename"]], Sys.info()[["user"]])
}
rscript_args <- c("-e", shQuote(paste0("#label=", rscript_label),mastershell),
rscript_args)
}
if (methods) {
rscript_args <- c("--default-packages=datasets,utils,grDevices,graphics,stats,methods",
rscript_args)
}
if (!localMachine && revtunnel) {
rscript_port <- port + (rank - 1L)
}
else {
rscript_port <- port
}
rscript <- paste(shQuote(rscript,workershell), collapse = " ")
rscript_args <- paste(rscript_args, collapse = " ")
envvars <- paste0("MASTER=", master, " PORT=", rscript_port,
" OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR)
cmd <- paste(rscript, rscript_args, envvars )
if (!is.na(renice) && renice > 0L) {
cmd <- sprintf("nice --adjustment=%d %s", renice, cmd)
}
if (!localMachine) {
find <- is.null(rshcmd)
if (find) {
which <- NULL
if (verbose) {
message(sprintf("%sWill search for all 'rshcmd' available\n",
verbose_prefix))
}
}
else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) {
find <- TRUE
if (verbose) {
message(sprintf("%sWill search for specified 'rshcmd' types: %s\n",
verbose_prefix, paste(sQuote(rshcmd), collapse = ", ")))
}
which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd)
}
if (find) {
rshcmd <- find_rshcmd(which = which, must_work = !localMachine &&
!manual && !dryrun)
if (verbose) {
s <- unlist(lapply(rshcmd, FUN = function(r) {
sprintf("%s [type=%s, version=%s]", paste(sQuote(r),
collapse = ", "), sQuote(attr(r, "type")),
sQuote(attr(r, "version")))
}))
s <- paste(sprintf("%s %d. %s", verbose_prefix,
seq_along(s), s), collapse = "\n")
message(sprintf("%sFound the following available 'rshcmd':\n%s",
verbose_prefix, s))
}
rshcmd <- rshcmd[[1]]
}
else {
if (is.null(attr(rshcmd, "type")))
attr(rshcmd, "type") <- "<unknown>"
if (is.null(attr(rshcmd, "version")))
attr(rshcmd, "version") <- "<unknown>"
}
s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd,
"type")), sQuote(attr(rshcmd, "version")))
rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd),
collapse = ", "), s)
if (verbose)
message(sprintf("%sUsing 'rshcmd': %s", verbose_prefix,
rshcmd_label))
if (length(user) == 1L)
rshopts <- c("-l", user, rshopts)
if (revtunnel) {
rshopts <- c(sprintf("-R %d:%s:%d", rscript_port,
master, port), rshopts)
if (isTRUE(attr(rshcmd, "OpenSSH_for_Windows"))) {
ver <- windows_build_version()
if (!is.null(ver) && ver <= "10.0.17763.253") {
msg <- sprintf("WARNING: You're running Windows 10 (build %s) where this 'rshcmd' (%s) may not support reverse tunneling (revtunnel = TRUE) resulting in worker failing to launch",
ver, paste(sQuote(rshcmd), collapse = ", "),
rshcmd_label)
if (verbose)
message(c(verbose_prefix, msg))
}
}
}
if (is.character(rshlogfile)) {
rshlogflag <- if ( grepl( "[Pp][Ll][Ii][Nn][Kk]", rshcmd[1] ) ) {
"-sshlog"
} else {
"-E"
}
rshopts <- c(sprintf("%s %s", rshlogflag, shQuote(rshlogfile,mastershell)),
rshopts)
}
rshopts <- paste(rshopts, collapse = " ")
rsh_call <- paste(paste(shQuote(rshcmd,mastershell), collapse = " "),
rshopts, worker)
local_cmd <- paste(rsh_call, shQuote(cmd, mastershell ) )
}
else {
local_cmd <- cmd
}
future:::stop_if_not(length(local_cmd) == 1L)
is_worker_output_visible <- is.null(outfile)
if (manual || dryrun) {
msg <- c("----------------------------------------------------------------------")
if (localMachine) {
msg <- c(msg, sprintf("Manually, start worker #%s on local machine %s with:",
rank, sQuote(worker)), sprintf("\n %s\n", cmd))
}
else {
msg <- c(msg, sprintf("Manually, (i) login into external machine %s:",
sQuote(worker)), sprintf("\n %s\n", rsh_call))
msg <- c(msg, sprintf("and (ii) start worker #%s from there:",
rank), sprintf("\n %s\n", cmd))
msg <- c(msg, sprintf("Alternatively, start worker #%s from the local machine by combining both step in a single call:",
rank), sprintf("\n %s\n", local_cmd))
}
msg <- paste(c(msg, ""), collapse = "\n")
cat(msg)
utils::flush.console()
if (dryrun)
return(NULL)
}
else {
if (verbose) {
message(sprintf("%sStarting worker #%s on %s: %s",
verbose_prefix, rank, sQuote(worker), local_cmd))
}
input <- if (.Platform$OS.type == "windows")
""
else NULL
res <- system(local_cmd, wait = FALSE, input = input)
if (verbose) {
message(sprintf("%s- Exit code of system() call: %s",
verbose_prefix, res))
}
if (res != 0) {
warning(sprintf("system(%s) had a non-zero exit code: %d",
local_cmd, res))
}
}
if (verbose) {
message(sprintf("%sWaiting for worker #%s on %s to connect back",
verbose_prefix, rank, sQuote(worker)))
if (is_worker_output_visible) {
if (.Platform$OS.type == "windows") {
message(sprintf("%s- Detected 'outfile=NULL' on Windows: this will make the output from the background worker visible when running R from a terminal, but it will most likely not be visible when using a GUI.",
verbose_prefix))
}
else {
message(sprintf("%s- Detected 'outfile=NULL': this will make the output from the background worker visible",
verbose_prefix))
}
}
}
con <- local({
setTimeLimit(elapsed = connectTimeout)
on.exit(setTimeLimit(elapsed = Inf))
warnings <- list()
tryCatch({
withCallingHandlers({
socketConnection("localhost", port = port, server = TRUE,
blocking = TRUE, open = "a+b", timeout = timeout)
}, warning = function(w) {
if (verbose) {
message(sprintf("%sDetected a warning from socketConnection(): %s",
verbose_prefix, sQuote(conditionMessage(w))))
}
warnings <<- c(warnings, list(w))
})
}, error = function(ex) {
setTimeLimit(elapsed = Inf)
machineType <- if (localMachine)
"local"
else "remote"
msg <- sprintf("Failed to launch and connect to R worker on %s machine %s from local machine %s.\n",
machineType, sQuote(worker), sQuote(Sys.info()[["nodename"]]))
cmsg <- conditionMessage(ex)
if (grepl(gettext("reached elapsed time limit"),
cmsg)) {
msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s (which suggests that the connection timeout of %.0f seconds (argument 'connectTimeout') kicked in)\n",
sQuote(cmsg), connectTimeout))
}
else {
msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s\n",
sQuote(cmsg)))
}
if (length(warnings) > 0) {
msg <- c(msg, sprintf(" * In addition, socketConnection() produced %d warning(s):\n",
length(warnings)))
for (kk in seq_along(warnings)) {
cmsg <- conditionMessage(warnings[[kk]])
if (grepl("port [0-9]+ cannot be opened", cmsg)) {
msg <- c(msg, sprintf(" - Warning #%d: %s (which suggests that this port is either already occupied by another process or blocked by the firewall on your local machine)\n",
kk, sQuote(cmsg)))
}
else {
msg <- c(msg, sprintf(" - Warning #%d: %s\n",
kk, sQuote(cmsg)))
}
}
}
msg <- c(msg, sprintf(" * The localhost socket connection that failed to connect to the R worker used port %d using a communication timeout of %.0f seconds and a connection timeout of %.0f seconds.\n",
port, timeout, connectTimeout))
msg <- c(msg, sprintf(" * Worker launch call: %s.\n",
local_cmd))
pid <- future:::readWorkerPID(pidfile)
if (!is.null(pid)) {
if (verbose)
message(sprintf("Killing worker process (PID %d) if still alive",
pid))
success <- pid_kill(pid)
if (verbose)
message(sprintf("Worker (PID %d) was successfully killed: %s",
pid, success))
msg <- c(msg, sprintf(" * Worker (PID %d) was successfully killed: %s\n",
pid, success))
}
else if (localMachine) {
msg <- c(msg, sprintf(" * Failed to kill local worker because it's PID is could not be identified.\n"))
}
suggestions <- NULL
if (!verbose) {
suggestions <- c(suggestions, "Set 'verbose=TRUE' to see more details.")
}
if (.Platform$OS.type == "windows") {
if (is_worker_output_visible) {
suggestions <- c(suggestions, "On Windows, to see output from worker, set 'outfile=NULL' and run R from a terminal (not a GUI).")
}
else {
suggestions <- c(suggestions, "On Windows, output from worker when using 'outfile=NULL' is only visible when running R from a terminal (not a GUI).")
}
}
else {
if (!is_worker_output_visible) {
suggestions <- c(suggestions, "Set 'outfile=NULL' to see output from worker.")
}
}
if (is.character(rshlogfile)) {
smsg <- sprintf("Inspect the content of log file %s for %s.",
sQuote(rshlogfile), sQuote(rshcmd))
lmsg <- tryCatch(readLines(rshlogfile, n = 15L,
warn = FALSE), error = function(ex) NULL)
if (length(lmsg) > 0) {
lmsg <- sprintf(" %2d: %s", seq_along(lmsg),
lmsg)
smsg <- sprintf("%s The first %d lines are:\n%s",
smsg, length(lmsg), paste(lmsg, collapse = "\n"))
}
suggestions <- c(suggestions, smsg)
}
else {
suggestions <- c(suggestions, sprintf("Set 'rshlogfile=TRUE' to enable logging for %s.",
sQuote(rshcmd)))
}
if (!localMachine && revtunnel && isTRUE(attr(rshcmd,
"OpenSSH_for_Windows"))) {
suggestions <- c(suggestions, sprintf("The 'rshcmd' (%s) used may not support reverse tunneling (revtunnel = TRUE). See ?future::makeClusterPSOCK for alternatives.\n",
rshcmd_label))
}
if (length(suggestions) > 0) {
suggestions <- sprintf(" - Suggestion #%d: %s\n",
seq_along(suggestions), suggestions)
msg <- c(msg, " * Troubleshooting suggestions:\n",
suggestions)
}
msg <- paste(msg, collapse = "")
ex$message <- msg
local({
oopts <- options(warning.length = 2000L)
on.exit(options(oopts))
stop(ex)
})
})
})
setTimeLimit(elapsed = Inf)
if (verbose) {
message(sprintf("%sConnection with worker #%s on %s established",
verbose_prefix, rank, sQuote(worker)))
}
structure(list(con = con, host = worker, rank = rank, rshlogfile = rshlogfile),
class = if (useXDR)
"SOCKnode"
else "SOCK0node")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment