Skip to content

Instantly share code, notes, and snippets.

@kourzanov
Created November 13, 2014 14:03
Embed
What would you like to do?
LSF module for Julia
module LSF
immutable LSFManager <: ClusterManager
machines::Dict
function LSFManager(; machines=[])
mhist = Dict()
for m in machines
cnt = get(mhist, m, 0)
mhist[m] = cnt + 1
end
new(mhist)
end
end
show(io::IO, manager::LSFManager) = println("LSFManager(machines=", manager.machines, ")")
function launch(manager::LSFManager, np::Integer, config::Dict, resp_arr::Array, machines_launch_ntfy::Condition)
# Launch on each unique host in parallel.
# Wait for all launches to complete.
plaunch_ntfy = Condition()
launch_tasks = cell(length(manager.machines))
for (i,(machine, cnt)) in enumerate(manager.machines)
launch_tasks[i] = @schedule launch_on_machine(manager, config, resp_arr, machines_launch_ntfy, machine, cnt, plaunch_ntfy)
end
while length(launch_tasks) > 0
if istaskdone(launch_tasks[1])
shift!(launch_tasks)
else
wait(plaunch_ntfy)
end
end
notify(machines_launch_ntfy)
end
function launch_on_machine(manager::LSFManager, config::Dict, resp_arr::Array, machines_launch_ntfy::Condition,
machine::String, cnt::Integer, plaunch_ntfy::Condition)
dir = config[:dir]
exename = config[:exename]
exeflags_base = config[:exeflags]
thisconfig = copy(config) # config for this worker
# machine could be of the format [user@]host[:port] bind_addr[:bind_port]
machine_bind = split(machine)
if length(machine_bind) > 1
exeflags = `--bind-to $(machine_bind[2]) $exeflags_base`
else
exeflags = exeflags_base
end
machine_def = machine_bind[1]
machine_def = split(machine_def, ':')
portopt = length(machine_def) == 2 ? ` -p $(machine_def[2]) ` : ``
sshflags = `$(config[:sshflags]) $portopt`
thisconfig[:sshflags] = sshflags
host = machine_def[1]
cmd = `ssh user@host "LSF_ENVDIR=/path/to/lsf/conf bjulia LSF $host $dir/run --worker"` # use ssh to remote launch
thisconfig[:machine] = host
# start the processes first...
maxp = config[:max_parallel]
if config[:tunnel]
maxp = div(maxp,2) + 1 # Since the tunnel will also take up one ssh connection
end
ios_to_check = []
t_check=time()
while cnt > 0
ios_to_check2 = []
for io in ios_to_check
if nb_available(io) == 0
push!(ios_to_check2, io)
end
end
ios_to_check=ios_to_check2
maxp_in_loop = maxp - length(ios_to_check)
if maxp_in_loop == 0
# wait for sometime and check again
sleep(0.1)
if (time() - t_check) > 50
error("Timed out waiting for launched worker")
end
continue
end
lc = cnt > maxp_in_loop ? maxp_in_loop : cnt
io_objs = cell(lc)
configs = cell(lc)
for i in 1:lc
io, pobj = open(detach(cmd), "r")
io_objs[i] = io
push!(ios_to_check, io)
end
cnt = cnt - lc
# ...and then read the host:port info. This optimizes overall start times.
# For ssh, the tunnel connection, if any, has to be with the specified machine name.
# but the port needs to be forwarded to the bound hostname/ip-address
push!(resp_arr, collect(zip(io_objs, fill(host, lc), fill(thisconfig, lc))))
notify(machines_launch_ntfy)
t_check=time()
end
notify(plaunch_ntfy)
end
function manage(manager::LSFManager, id::Integer, config::Dict, op::Symbol)
if op == :interrupt
if haskey(config, :ospid)
machine = config[:machine]
if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $(config[:sshflags]) $machine "kill -2 $(config[:ospid])"`)
println("Error sending a Ctrl-C to julia worker $id on $machine")
end
else
# This state can happen immediately after an addprocs
println("Worker $id cannot be presently interrupted.")
end
elseif op == :register
config[:ospid] = remotecall_fetch(id, getpid)
end
end
function read_cb_responses(io::IO, config::Dict)
return [(io, host, port, host, config) for (host,port) in read_workers_host_port(io)]
end
function read_cb_responses(io::IO, host::String, config::Dict)
return [(io, bind_addr, port, host, config) for (bind_addr, port) in read_workers_host_port(io)]
end
read_cb_response(io::IO, host::String, port::Integer, config::Dict) = (io, host, port, host, config)
read_cb_response(host::String, port::Integer, config::Dict) = (nothing, host, port, host, config)
function start_cluster_workers(np::Integer, config::Dict, manager::ClusterManager, resp_arr::Array, launched_ntfy::Condition)
# Get the cluster manager to launch the instance
instance_sets = []
instances_ntfy = Condition()
t = @schedule launch(manager, np, config, instance_sets, instances_ntfy)
while true
if (length(instance_sets) == 0)
istaskdone(t) && break
@schedule (sleep(1); notify(instances_ntfy))
wait(instances_ntfy)
end
if length(instance_sets) > 0
instances = shift!(instance_sets)
for inst in instances
#println("here")
for (io, bind_addr, port, pubhost, wconfig) in read_cb_responses(inst...)
#println("bind_addr='$bind_addr', port='$port'")
push!(resp_arr, create_worker(bind_addr, port, pubhost, io, wconfig, manager))
notify(launched_ntfy)
end
end
end
end
notify(launched_ntfy)
end
function parse_ncpus(str)
m = match(r"^ncpus:(\d+)", str)
if m != nothing
parseint(Int16, m.captures[1])
else
int16(-1)
end
end
function parse_connection_info(str)
m = match(r"^julia_worker:(\d+)#(.*)", str)
if m != nothing
(m.captures[2], parseint(Int16, m.captures[1]))
else
("", int16(-1))
end
end
function read_workers_host_port(io::IO)
io.line_buffered = true
ncpus = -1
while ncpus < 1
line = readline(io)
if line==""
ncpus=-1
break
end
ncpus = parse_ncpus(line)
end
println("found $ncpus CPUs")
if ncpus < 1
return "",ncpus
end
i=0
r=[]
while i<ncpus
line = readline(io)
bind_addr, port = parse_connection_info(line)
if bind_addr != ""
r=[r,(bind_addr, port)]
i+=1
end
end
println("parsed $i,$ncpus, $r")
return r
end
# start and connect to processes via SSH.
# optionally through an SSH tunnel.
# the tunnel is only used from the head (process 1); the nodes are assumed
# to be mutually reachable without a tunnel, as is often the case in a cluster.
# Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config
function addprocs_internal(np::Integer;
tunnel=false, dir=JULIA_HOME,
exename=(ccall(:jl_is_debugbuild,Cint,())==0?"./julia":"./julia-debug"),
sshflags::Cmd=``, manager=LocalManager(), exeflags=``, max_parallel=10)
config = Dict(:dir=>dir, :exename=>exename, :exeflags=>`$exeflags --worker`, :tunnel=>tunnel, :sshflags=>sshflags, :max_parallel=>max_parallel)
ret = Array(Int, 0)
rr_join = Array(RemoteRef, 0)
resp_arr = []
c = Condition()
t = @schedule start_cluster_workers(np, config, manager, resp_arr, c)
while true
if length(resp_arr) == 0
istaskdone(t) && break
@schedule (sleep(1); notify(c))
wait(c)
end
if length(resp_arr) > 0
w = shift!(resp_arr)
id, rr = add_worker(PGRP, w)
#println("adding a worker $id $rr")
push!(ret, id)
push!(rr_join, rr)
end
end
#println("ret=$ret, rr_join=$rr_join")
for rr in rr_join
wait(rr)
end
#assert(length(ret) == np)
ret
end
addprocs(np::Integer; kwargs...) = addprocs_internal(np; kwargs...)
function addprocs(machines::AbstractVector; kwargs...)
manager_defined = any(x -> begin k,v = x; k==:manager end, kwargs)
if manager_defined
error("custom cluster managers unsupported on the ssh interface")
else
addprocs_internal(length(machines); manager=LSFManager(machines=machines), kwargs...)
end
end
end # module LSF
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment