Skip to content

Instantly share code, notes, and snippets.

@clarkfitzg
Created June 4, 2018 18:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save clarkfitzg/bae6d2365dfb0787d141964658e3af21 to your computer and use it in GitHub Desktop.
Save clarkfitzg/bae6d2365dfb0787d141964658e3af21 to your computer and use it in GitHub Desktop.
#!/usr/bin/env Rscript
# 2018-06-04 11:26:12
# Automatically generated from R by autoparallel version 0.0.1
library(parallel)
nworkers = 2
timeout = 600
cls = makeCluster(nworkers, "PSOCK")
# Each worker updates a copy of this object. On worker j workers[[i]] will
# contain an open socket connection between workers j and i.
workers = vector(nworkers, mode = "list")
close.NULL = function(...) NULL
#' Connect workers as peers
connect = function(server, client, port, sleep = 0.1, ...)
{
if(ID == server){
con = socketConnection(port = port, server = TRUE
, blocking = TRUE, open = "a+b", ...)
workers[[client]] <<- con
}
if(ID == client){
Sys.sleep(sleep)
con = socketConnection(port = port, server = FALSE
, blocking = TRUE, open = "a+b", ...)
workers[[server]] <<- con
}
NULL
}
clusterExport(cls, c("workers", "connect", "close.NULL"))
# Each worker has an ID
clusterMap(cls, assign, "ID", seq(nworkers)
, MoreArgs = list(envir = .GlobalEnv))
# Define the peer to peer connections
socket_map = read.csv(text = '
"server","client","port"
1,2,33000
')
# Open the connections
by(socket_map, seq(nrow(socket_map)), function(x){
clusterCall(cls, connect, x$server, x$client, x$port, timeout = timeout)
})
worker_code = c(
'if(ID != 1)
stop(sprintf("Worker is attempting to execute wrong code.
This code is for 1, but manager assigned ID %s", ID))
a1 = 1
a2 = 2
b2 <- unserialize(workers[[2]])
a3 = a1 + a2 + b2
serialize(a2, workers[[2]], xdr = FALSE)
b3 <- unserialize(workers[[2]])
a4 = a2 + a3 + b3
serialize(a3, workers[[2]], xdr = FALSE)
b4 <- unserialize(workers[[2]])
a5 = a3 + a4 + b4
serialize(a4, workers[[2]], xdr = FALSE)',
############################################################
'if(ID != 2)
stop(sprintf("Worker is attempting to execute wrong code.
This code is for 2, but manager assigned ID %s", ID))
b1 = 1
b2 = 2
serialize(b2, workers[[1]], xdr = FALSE)
a2 <- unserialize(workers[[1]])
b3 = b1 + b2 + a2
serialize(b3, workers[[1]], xdr = FALSE)
a3 <- unserialize(workers[[1]])
b4 = b2 + b3 + a3
serialize(b4, workers[[1]], xdr = FALSE)
a4 <- unserialize(workers[[1]])
b5 = b3 + b4 + a4
writeLines(as.character(b5), "script4.R.log")'
)
evalg = function(codestring)
{
code = parse(text = codestring)
eval(code, .GlobalEnv)
NULL
}
# Action!
parLapply(cls, worker_code, evalg)
# Close peer to peer connections
clusterEvalQ(cls, lapply(workers, close))
stopCluster(cls)
@wlandau
Copy link

wlandau commented Jun 14, 2018

Is there a variant of serialize() that acts more like "peek" than "pop"?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment