Skip to content

Instantly share code, notes, and snippets.

@mschubert
Last active August 29, 2015 14:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mschubert/79aae1c2b4f9500c05d3 to your computer and use it in GitHub Desktop.
Save mschubert/79aae1c2b4f9500c05d3 to your computer and use it in GitHub Desktop.
Minimal example of using rzmq to submit a worker job using LSF
#BSUB-J {{ job_name }} # name of the job / array jobs
#BSUB-o {{ log_file | /dev/null }} # output is sent to logfile, stdout + stderr by default
#BSUB-P {{ queue }} # Job queue
#BSUB-W {{ walltime }} # Walltime in minutes
#BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
#BSUB-R select[panfs_nobackup_research]
R --no-save --no-restore --args "{{ args }}" < "{{ rscript }}"
library(rzmq)
library(infuser)
# use the template & submit
values = list(
queue = "research-rh6",
walltime = 10080,
memory = 1024,
rscript = "worker.r",
args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124)
)
# bind status socket
zmq.context = init.context()
socket = init.socket(zmq.context, "ZMQ_REP")
bind.socket(socket, "tcp://*:6124")
# do the submissions
njobs = 10
for (j in 1:njobs) {
values$job_name = paste0("rzmq-", j)
values$log_file = paste0(values$job_name, ".log")
system("bsub", input=infuse("LSF.tmpl", values))
}
# define the function to run on the workers and input data
fun = function(x, y) x*x+y
job_data = 1:1e4
job_const = list(y=100)
job_status = factor(rep("queued", length(job_data)), levels=c("queued", "running", "done"))
job_result = list()
# send and receive messages
while(any(job_status %in% c("queued", "running"))) {
msg = receive.socket(socket)
print(msg)
if (msg$id == 0)
send.socket(socket, data=list(fun=fun, const=job_const), send.more=TRUE)
else {
job_status[msg$id] = "done"
job_result[[msg$id]] = msg$result
}
id = which(job_status == "queued")[1]
if (!is.na(id)) {
send.socket(socket, data=list(id=id, iter=job_data[id]))
job_status[id] = "running"
} else
send.socket(socket, data=list(id=0))
Sys.sleep(0.001)
}
# this should be submitted by bsub
# and get the server as argument
master = commandArgs(TRUE)[1]
#master = "tcp://ebi-002.ebi.ac.uk:6124"
print(master)
library(rzmq)
context = init.context()
socket = init.socket(context, "ZMQ_REQ")
connect.socket(socket, master)
send.socket(socket, data=list(id=0))
msg = receive.socket(socket)
print(msg)
fun = msg$fun
const = msg$const
while(TRUE) {
msg = receive.socket(socket)
print(msg)
if (msg$id == 0)
break
result = try(do.call(fun, c(const, msg$iter)))
print(result)
send.socket(socket, data=list(id = msg$id, result=result))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment