-
-
Save ViralBShah/c8028840b9bdc3c68b63e9770f4c4e46 to your computer and use it in GitHub Desktop.
LSF module for Julia
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module LSF | |
immutable LSFManager <: ClusterManager | |
machines::Dict | |
function LSFManager(; machines=[]) | |
mhist = Dict() | |
for m in machines | |
cnt = get(mhist, m, 0) | |
mhist[m] = cnt + 1 | |
end | |
new(mhist) | |
end | |
end | |
show(io::IO, manager::LSFManager) = println("LSFManager(machines=", manager.machines, ")") | |
function launch(manager::LSFManager, np::Integer, config::Dict, resp_arr::Array, machines_launch_ntfy::Condition) | |
# Launch on each unique host in parallel. | |
# Wait for all launches to complete. | |
plaunch_ntfy = Condition() | |
launch_tasks = cell(length(manager.machines)) | |
for (i,(machine, cnt)) in enumerate(manager.machines) | |
launch_tasks[i] = @schedule launch_on_machine(manager, config, resp_arr, machines_launch_ntfy, machine, cnt, plaunch_ntfy) | |
end | |
while length(launch_tasks) > 0 | |
if istaskdone(launch_tasks[1]) | |
shift!(launch_tasks) | |
else | |
wait(plaunch_ntfy) | |
end | |
end | |
notify(machines_launch_ntfy) | |
end | |
function launch_on_machine(manager::LSFManager, config::Dict, resp_arr::Array, machines_launch_ntfy::Condition, | |
machine::String, cnt::Integer, plaunch_ntfy::Condition) | |
dir = config[:dir] | |
exename = config[:exename] | |
exeflags_base = config[:exeflags] | |
thisconfig = copy(config) # config for this worker | |
# machine could be of the format [user@]host[:port] bind_addr[:bind_port] | |
machine_bind = split(machine) | |
if length(machine_bind) > 1 | |
exeflags = `--bind-to $(machine_bind[2]) $exeflags_base` | |
else | |
exeflags = exeflags_base | |
end | |
machine_def = machine_bind[1] | |
machine_def = split(machine_def, ':') | |
portopt = length(machine_def) == 2 ? ` -p $(machine_def[2]) ` : `` | |
sshflags = `$(config[:sshflags]) $portopt` | |
thisconfig[:sshflags] = sshflags | |
host = machine_def[1] | |
cmd = `ssh user@host "LSF_ENVDIR=/path/to/lsf/conf bjulia LSF $host $dir/run --worker"` # use ssh to remote launch | |
thisconfig[:machine] = host | |
# start the processes first... | |
maxp = config[:max_parallel] | |
if config[:tunnel] | |
maxp = div(maxp,2) + 1 # Since the tunnel will also take up one ssh connection | |
end | |
ios_to_check = [] | |
t_check=time() | |
while cnt > 0 | |
ios_to_check2 = [] | |
for io in ios_to_check | |
if nb_available(io) == 0 | |
push!(ios_to_check2, io) | |
end | |
end | |
ios_to_check=ios_to_check2 | |
maxp_in_loop = maxp - length(ios_to_check) | |
if maxp_in_loop == 0 | |
# wait for sometime and check again | |
sleep(0.1) | |
if (time() - t_check) > 50 | |
error("Timed out waiting for launched worker") | |
end | |
continue | |
end | |
lc = cnt > maxp_in_loop ? maxp_in_loop : cnt | |
io_objs = cell(lc) | |
configs = cell(lc) | |
for i in 1:lc | |
io, pobj = open(detach(cmd), "r") | |
io_objs[i] = io | |
push!(ios_to_check, io) | |
end | |
cnt = cnt - lc | |
# ...and then read the host:port info. This optimizes overall start times. | |
# For ssh, the tunnel connection, if any, has to be with the specified machine name. | |
# but the port needs to be forwarded to the bound hostname/ip-address | |
push!(resp_arr, collect(zip(io_objs, fill(host, lc), fill(thisconfig, lc)))) | |
notify(machines_launch_ntfy) | |
t_check=time() | |
end | |
notify(plaunch_ntfy) | |
end | |
function manage(manager::LSFManager, id::Integer, config::Dict, op::Symbol) | |
if op == :interrupt | |
if haskey(config, :ospid) | |
machine = config[:machine] | |
if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $(config[:sshflags]) $machine "kill -2 $(config[:ospid])"`) | |
println("Error sending a Ctrl-C to julia worker $id on $machine") | |
end | |
else | |
# This state can happen immediately after an addprocs | |
println("Worker $id cannot be presently interrupted.") | |
end | |
elseif op == :register | |
config[:ospid] = remotecall_fetch(id, getpid) | |
end | |
end | |
function read_cb_responses(io::IO, config::Dict) | |
return [(io, host, port, host, config) for (host,port) in read_workers_host_port(io)] | |
end | |
function read_cb_responses(io::IO, host::String, config::Dict) | |
return [(io, bind_addr, port, host, config) for (bind_addr, port) in read_workers_host_port(io)] | |
end | |
read_cb_response(io::IO, host::String, port::Integer, config::Dict) = (io, host, port, host, config) | |
read_cb_response(host::String, port::Integer, config::Dict) = (nothing, host, port, host, config) | |
function start_cluster_workers(np::Integer, config::Dict, manager::ClusterManager, resp_arr::Array, launched_ntfy::Condition) | |
# Get the cluster manager to launch the instance | |
instance_sets = [] | |
instances_ntfy = Condition() | |
t = @schedule launch(manager, np, config, instance_sets, instances_ntfy) | |
while true | |
if (length(instance_sets) == 0) | |
istaskdone(t) && break | |
@schedule (sleep(1); notify(instances_ntfy)) | |
wait(instances_ntfy) | |
end | |
if length(instance_sets) > 0 | |
instances = shift!(instance_sets) | |
for inst in instances | |
#println("here") | |
for (io, bind_addr, port, pubhost, wconfig) in read_cb_responses(inst...) | |
#println("bind_addr='$bind_addr', port='$port'") | |
push!(resp_arr, create_worker(bind_addr, port, pubhost, io, wconfig, manager)) | |
notify(launched_ntfy) | |
end | |
end | |
end | |
end | |
notify(launched_ntfy) | |
end | |
function parse_ncpus(str) | |
m = match(r"^ncpus:(\d+)", str) | |
if m != nothing | |
parseint(Int16, m.captures[1]) | |
else | |
int16(-1) | |
end | |
end | |
function parse_connection_info(str) | |
m = match(r"^julia_worker:(\d+)#(.*)", str) | |
if m != nothing | |
(m.captures[2], parseint(Int16, m.captures[1])) | |
else | |
("", int16(-1)) | |
end | |
end | |
function read_workers_host_port(io::IO) | |
io.line_buffered = true | |
ncpus = -1 | |
while ncpus < 1 | |
line = readline(io) | |
if line=="" | |
ncpus=-1 | |
break | |
end | |
ncpus = parse_ncpus(line) | |
end | |
println("found $ncpus CPUs") | |
if ncpus < 1 | |
return "",ncpus | |
end | |
i=0 | |
r=[] | |
while i<ncpus | |
line = readline(io) | |
bind_addr, port = parse_connection_info(line) | |
if bind_addr != "" | |
r=[r,(bind_addr, port)] | |
i+=1 | |
end | |
end | |
println("parsed $i,$ncpus, $r") | |
return r | |
end | |
# start and connect to processes via SSH. | |
# optionally through an SSH tunnel. | |
# the tunnel is only used from the head (process 1); the nodes are assumed | |
# to be mutually reachable without a tunnel, as is often the case in a cluster. | |
# Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config | |
function addprocs_internal(np::Integer; | |
tunnel=false, dir=JULIA_HOME, | |
exename=(ccall(:jl_is_debugbuild,Cint,())==0?"./julia":"./julia-debug"), | |
sshflags::Cmd=``, manager=LocalManager(), exeflags=``, max_parallel=10) | |
config = Dict(:dir=>dir, :exename=>exename, :exeflags=>`$exeflags --worker`, :tunnel=>tunnel, :sshflags=>sshflags, :max_parallel=>max_parallel) | |
ret = Array(Int, 0) | |
rr_join = Array(RemoteRef, 0) | |
resp_arr = [] | |
c = Condition() | |
t = @schedule start_cluster_workers(np, config, manager, resp_arr, c) | |
while true | |
if length(resp_arr) == 0 | |
istaskdone(t) && break | |
@schedule (sleep(1); notify(c)) | |
wait(c) | |
end | |
if length(resp_arr) > 0 | |
w = shift!(resp_arr) | |
id, rr = add_worker(PGRP, w) | |
#println("adding a worker $id $rr") | |
push!(ret, id) | |
push!(rr_join, rr) | |
end | |
end | |
#println("ret=$ret, rr_join=$rr_join") | |
for rr in rr_join | |
wait(rr) | |
end | |
#assert(length(ret) == np) | |
ret | |
end | |
addprocs(np::Integer; kwargs...) = addprocs_internal(np; kwargs...) | |
function addprocs(machines::AbstractVector; kwargs...) | |
manager_defined = any(x -> begin k,v = x; k==:manager end, kwargs) | |
if manager_defined | |
error("custom cluster managers unsupported on the ssh interface") | |
else | |
addprocs_internal(length(machines); manager=LSFManager(machines=machines), kwargs...) | |
end | |
end | |
end # module LSF |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment