Skip to content

Instantly share code, notes, and snippets.

@amitmurthy
Created March 26, 2015 07:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amitmurthy/05ca93b3eb1e20bfb1af to your computer and use it in GitHub Desktop.
Save amitmurthy/05ca93b3eb1e20bfb1af to your computer and use it in GitHub Desktop.
type DArray{T,N,A} <: AbstractArray{T,N}
dims::NTuple{N,Int}
chunks::Array{RemoteRef,N}
pmap::Array{Int,N} # pmap[i]==p ⇒ processor p has piece i
indexes::Array{NTuple{N,UnitRange{Int}},N} # indexes held by piece i
cuts::Vector{Vector{Int}} # cuts[d][i] = first index of chunk i in dimension d
function DArray(dims, chunks, pmap, indexes, cuts)
# check invariants
if size(chunks) != size(indexes)
throw(ArgumentError("size(chunks) != size(indexes), $(repr(chunks)) != $(repr(indexes))"))
elseif length(chunks) != length(pmap)
throw(ArgumentError("length(chunks) != length(pmap), $(length(chunks)) != $(length(pmap))"))
elseif dims != map(last, last(indexes))
throw(ArgumentError("dimension of DArray (dim) and indexes do not match"))
end
return new(dims, chunks, reshape(pmap, size(chunks)), indexes, cuts)
end
end
typealias SubDArray{T,N,D<:DArray} SubArray{T,N,D}
typealias SubOrDArray{T,N} Union(DArray{T,N}, SubDArray{T,N})
## core constructors ##
function DArray(init, dims, procs, dist)
# dist == size(chunks)
np = prod(dist)
procs = procs[1:np]
idxs, cuts = chunk_idxs([dims...], dist)
chunks = Array(RemoteRef, dist...)
for i = 1:np
chunks[i] = remotecall(procs[i], init, idxs[i])
end
p = max(1, localpartindex(procs))
A = remotecall_fetch(procs[p], r->typeof(fetch(r)), chunks[p])
return DArray{eltype(A),length(dims),A}(dims, chunks, procs, idxs, cuts)
end
function DArray(init, dims, procs)
if isempty(procs)
throw(ArgumentError("no processors given"))
end
return DArray(init, dims, procs, defaultdist(dims, procs))
end
DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(), maximum(dims))])
# new DArray similar to an existing one
DArray(init, d::DArray) = DArray(init, size(d), procs(d), [size(d.chunks)...])
macro DArray(ex::Expr)
if ex.head !== :comprehension
throw(ArgumentError("invalid @DArray syntax"))
end
ex.args[1] = esc(ex.args[1])
ndim = length(ex.args) - 1
ranges = map(r->esc(r.args[2]), ex.args[2:end])
for d = 1:ndim
var = ex.args[d+1].args[1]
ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] )
end
return :( DArray((I::(UnitRange{Int}...))->($ex),
tuple($(map(r->:(length($r)), ranges)...))) )
end
Base.similar(d::DArray, T, dims::Dims) = DArray(I->Array(T, map(length,I)), dims, procs(d))
Base.similar(d::DArray, T) = similar(d, T, size(d))
Base.similar{T}(d::DArray{T}, dims::Dims) = similar(d, T, dims)
Base.similar{T}(d::DArray{T}) = similar(d, T, size(d))
Base.size(d::DArray) = d.dims
@doc """
### procs(d::DArray)
Get the vector of processes storing pieces of DArray `d`.
""" ->
Base.procs(d::DArray) = d.pmap
chunktype{T,N,A}(d::DArray{T,N,A}) = A
## chunk index utilities ##
# decide how to divide each dimension
# returns size of chunks array
function defaultdist(dims, procs)
dims = [dims...]
chunks = ones(Int, length(dims))
np = length(procs)
f = sort!(collect(keys(factor(np))), rev=true)
k = 1
while np > 1
# repeatedly allocate largest factor to largest dim
if np % f[k] != 0
k += 1
if k > length(f)
break
end
end
fac = f[k]
(d, dno) = findmax(dims)
# resolve ties to highest dim
dno = last(find(dims .== d))
if dims[dno] >= fac
dims[dno] = div(dims[dno], fac)
chunks[dno] *= fac
end
np = div(np, fac)
end
return chunks
end
# get array of start indexes for dividing sz into nc chunks
function defaultdist(sz::Int, nc::Int)
if sz >= nc
return round(Int, linspace(1, sz+1, nc+1))
else
return [[1:(sz+1)], zeros(Int, nc-sz)]
end
end
# compute indexes array for dividing dims into chunks
function chunk_idxs(dims, chunks)
cuts = map(defaultdist, dims, chunks)
n = length(dims)
idxs = Array(NTuple{n,UnitRange{Int}},chunks...)
cartesianmap(tuple(chunks...)) do cidx...
idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1))
end
return (idxs, cuts)
end
function localpartindex(pmap::Array{Int})
mi = myid()
for i = 1:length(pmap)
if pmap[i] == mi
return i
end
end
return 0
end
localpartindex(d::DArray) = localpartindex(d.pmap)
@doc """
### localpart(d)
Get the local piece of a distributed array.
Returns an empty array if no local part exists on the calling process.
""" ->
function localpart{T,N,A}(d::DArray{T,N,A})
lpidx = localpartindex(d)
if lpidx == 0
return convert(A, Array(T, ntuple(N, i->0)))::A
end
return fetch(d.chunks[lpidx])::A
end
@doc """
### localindexes(d)
A tuple describing the indexes owned by the local process.
Returns a tuple with empty ranges if no local part exists on the calling process.
""" ->
function localindexes(d::DArray)
lpidx = localpartindex(d)
if lpidx == 0
return ntuple(ndims(d), i->1:0)
end
return d.indexes[lpidx]
end
# find which piece holds index (I...)
locate(d::DArray, I::Int...) =
ntuple(ndims(d), i->searchsortedlast(d.cuts[i], I[i]))
chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(d.chunks[i...])::A
## convenience constructors ##
@doc """
### dzeros(dims, ...)
Construct a distributed array of zeros.
Trailing arguments are the same as those accepted by `DArray`.
""" ->
dzeros(args...) = DArray(I->zeros(map(length,I)), args...)
dzeros(d::Int...) = dzeros(d)
@doc """
### dzeros(dims, ...)
Construct a distributed array of ones.
Trailing arguments are the same as those accepted by `DArray`.
""" ->
dones(args...) = DArray(I->ones(map(length,I)), args...)
dones(d::Int...) = dones(d)
@doc """
### dfill(x, dims, ...)
Construct a distributed array filled with value `x`.
Trailing arguments are the same as those accepted by `DArray`.
""" ->
dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)
dfill(v, d::Int...) = dfill(v, d)
@doc """
### drand(dims, ...)
Construct a distributed uniform random array.
Trailing arguments are the same as those accepted by `DArray`.
""" ->
drand(args...) = DArray(I->rand(map(length,I)), args...)
drand(d::Int...) = drand(d)
@doc """
### drandn(dims, ...)
Construct a distributed normal random array.
Trailing arguments are the same as those accepted by `DArray`.
""" ->
drandn(args...) = DArray(I->randn(map(length,I)), args...)
drandn(d::Int...) = drandn(d)
## conversions ##
@doc """
### distribute(A[, procs])
Convert a local array to distributed.
`procs` optionally specifies a vector of process IDs to use. (defaults to all workers)
""" ->
function distribute(A::AbstractArray;
procs=workers()[1:min(nworkers(), maximum(size(A)))])
owner = myid()
rr = RemoteRef()
put!(rr, A)
d = DArray(size(A), procs) do I
remotecall_fetch(owner, ()->fetch(rr)[I...])
end
# Ensure that all workers have fetched their localparts.
# Else a gc in between can recover the RemoteRef rr
for chunk in d.chunks
wait(chunk)
end
return d
end
Base.convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N}) = begin
a = Array(S, size(d))
@sync begin
for i = 1:length(d.chunks)
@async a[d.indexes[i]...] = chunk(d, i)
end
end
return a
end
Base.convert{S,T,N}(::Type{Array{S,N}}, s::SubDArray{T,N}) = begin
I = s.indexes
d = s.parent
if isa(I,(UnitRange{Int}...)) && S<:T && T<:S
l = locate(d, map(first, I)...)
if isequal(d.indexes[l...], I)
# SubDArray corresponds to a chunk
return chunk(d, l...)
end
end
a = Array(S, size(s))
a[[1:size(a,i) for i=1:N]...] = s
return a
end
Base.reshape{T,S<:Array}(A::DArray{T,1,S}, d::Dims) = begin
if prod(d) != length(A)
throw(DimensionMismatch("dimensions must be consistent with array size"))
end
return DArray(d) do I
sz = map(length,I)
d1offs = first(I[1])
nd = length(I)
B = Array(T,sz)
nr = size(B,1)
sztail = size(B)[2:end]
for i=1:div(length(B),nr)
i2 = ind2sub(sztail, i)
globalidx = [ I[j][i2[j-1]] for j=2:nd ]
a = sub2ind(d, d1offs, globalidx...)
B[:,i] = A[a:(a+nr-1)]
end
B
end
end
## indexing ##
Base.getindex(r::RemoteRef, args...) = begin
if r.where == myid()
return getindex(fetch(r), args...)
end
return remotecall_fetch(r.where, getindex, r, args...)
end
function getindex_tuple{T}(d::DArray{T}, I::(Int...))
chidx = locate(d, I...)
chunk = d.chunks[chidx...]
idxs = d.indexes[chidx...]
localidx = ntuple(ndims(d), i->(I[i]-first(idxs[i])+1))
return chunk[localidx...]::T
end
Base.getindex(d::DArray, i::Int) = getindex_tuple(d, ind2sub(size(d), i))
Base.getindex(d::DArray, i::Int...) = getindex_tuple(d, i)
Base.getindex(d::DArray) = d[1]
Base.getindex(d::DArray, I::Union(Int,UnitRange{Int})...) = sub(d, I...)
Base.copy!(dest::SubOrDArray, src::SubOrDArray) = begin
if !(dest.dims == src.dims &&
dest.pmap == src.pmap &&
dest.indexes == src.indexes &&
dest.cuts == src.cuts)
throw(DimensionMismatch("destination array doesn't fit to source array"))
end
@sync for p in dest.pmap
@spawnat p copy!(localpart(dest), localpart(src))
end
return dest
end
# local copies are obtained by convert(Array, ) or assigning from
# a SubDArray to a local Array.
Base.setindex!(a::Array, d::DArray, I::UnitRange{Int}...) = begin
n = length(I)
@sync for i = 1:length(d.chunks)
K = d.indexes[i]
@async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i)
end
return a
end
Base.setindex!(a::Array, s::SubDArray, I::UnitRange{Int}...) = begin
n = length(I)
d = s.parent
J = s.indexes
if length(J) < n
a[I...] = convert(Array,s)
return a
end
offs = [isa(J[i],Int) ? J[i]-1 : first(J[i])-1 for i=1:n]
@sync for i = 1:length(d.chunks)
K_c = Any[d.indexes[i]...]
K = [ intersect(J[j],K_c[j]) for j=1:n ]
if !any(isempty, K)
idxs = [ I[j][K[j]-offs[j]] for j=1:n ]
if isequal(K, K_c)
# whole chunk
@async a[idxs...] = chunk(d, i)
else
# partial chunk
ch = d.chunks[i]
@async a[idxs...] =
remotecall_fetch(ch.where,
()->sub(fetch(ch),
[K[j]-first(K_c[j])+1 for j=1:n]...))
end
end
end
return a
end
# to disambiguate
Base.setindex!(a::Array{Any}, d::SubOrDArray, i::Int) = arrayset(a, d, i)
Base.setindex!(a::Array, d::SubOrDArray, I::Union(Int,UnitRange{Int})...) =
setindex!(a, d, [isa(i, Int) ? (i:i) : i for i in I ]...)
Base.fill!(A::DArray, x) = begin
@sync for p in procs(A)
@spawnat p fill!(localpart(A), x)
end
return A
end
## higher-order functions ##
Base.map(f, d::DArray) = DArray(I->map(f, localpart(d)), d)
Base.reduce(f, d::DArray) =
mapreduce(fetch, f,
Any[ @spawnat p reduce(f, localpart(d)) for p in procs(d) ])
Base.mapreduce(f, opt::Function, d::DArray) =
mapreduce(fetch, opt,
Any[ @spawnat p mapreduce(f, opt, localpart(d)) for p in procs(d) ])
Base.map!(f, d::DArray) = begin
@sync for p in procs(d)
@spawnat p map!(f, localpart(d))
end
return d
end
# mapreducedim
Base.reducedim_initarray{R}(A::DArray, region, v0, ::Type{R}) = begin
procsgrid = reshape(procs(A), size(A.indexes))
gridsize = Base.reduced_dims(size(A.indexes), region)
procsgrid = procsgrid[UnitRange{Int}[1:n for n = gridsize]...]
return dfill(convert(R, v0), Base.reduced_dims(A, region), procsgrid, gridsize)
end
Base.reducedim_initarray{T}(A::DArray, region, v0::T) = Base.reducedim_initarray(A, region, v0, T)
Base.reducedim_initarray0{R}(A::DArray, region, v0, ::Type{R}) = begin
procsgrid = reshape(procs(A), size(A.indexes))
gridsize = Base.reduced_dims0(size(A.indexes), region)
procsgrid = procsgrid[UnitRange{Int}[1:n for n = gridsize]...]
return dfill(convert(R, v0), Base.reduced_dims0(A, region), procsgrid, gridsize)
end
Base.reducedim_initarray0{T}(A::DArray, region, v0::T) = Base.reducedim_initarray0(A, region, v0, T)
mapreducedim_within(f, op, A::DArray, region) = begin
arraysize = [size(A)...]
gridsize = [size(A.indexes)...]
arraysize[[region...]] = gridsize[[region...]]
return DArray(tuple(arraysize...), procs(A), tuple(gridsize...)) do I
mapreducedim(f, op, localpart(A), region)
end
end
function mapreducedim_between!(f, op, R::DArray, A::DArray, region)
@sync for p in procs(R)
@spawnat p begin
localind = [r for r = localindexes(A)]
localind[[region...]] = [1:n for n = size(A)[[region...]]]
B = convert(Array, A[localind...])
Base.mapreducedim!(f, op, localpart(R), B)
end
end
return R
end
Base.mapreducedim!(f, op, R::DArray, A::DArray) = begin
lsize = Base.check_reducedims(R,A)
if isempty(A)
return copy(R)
end
region = tuple(collect(1:ndims(A))[[size(R)...] .!= [size(A)...]]...)
if isempty(region)
return copy!(R, A)
end
B = mapreducedim_within(f, op, A, region)
return mapreducedim_between!(identity, op, R, B, region)
end
Base.mapreducedim(f, op, R::DArray, A::DArray) = begin
Base.mapreducedim!(f, op, Base.reducedim_initarray(A, region, v0), A)
end
# LinAlg
Base.scale!(A::DArray, x::Number) = begin
@sync for p in procs(A)
@spawnat p scale!(localpart(A), x)
end
return A
end
function DArray(init, dims, procs, dist, distfunc::Function)
np = prod(dist)
procs = procs[1:np]
idxs, cuts = distfunc([dims...], procs)
chunks = Array(RemoteRef, dist...)
for i = 1:np
chunks[i] = remotecall(procs[i], init, idxs[i])
end
p = max(1, localpartindex(procs))
A = remotecall_fetch(procs[p], r->typeof(fetch(r)), chunks[p])
DArray{eltype(A),length(dims),A}(dims, chunks, procs, idxs, cuts)
end
function mylu(a, b, c)
n = size(a,1)
for i = 2:n
l = c [i - 1] / a [i - 1]
a [i] -= c [i - 1] * l
b [i] -= l * b [i - 1]
end
b [n] /= a [n]
for i = n - 1 : -1 : 1
b [i] = (b [i] - c [i] * b [i + 1]) / a [i]
end
end
#backsolve
function usolve(local_ref, x1, x2, st)
(Da, Db, Dc, Dd, Dl, Dx) = fetch(local_ref)
usolve(Dc, Dd, Dl, Dx, x1, x2, st)
end
function usolve(Dc, Dd, Dl, Dx, x1, x2, st)
t0 = time()
c = localpart(Dc)
d = localpart(Dd)
l = localpart(Dl)
x = localpart(Dx)
n = size(x, 1)
#x1, x2 are solutions at the next and previous separators
x_ = x [n] = x1
for i = n - 1 : -1 : 1
x_ = x [i] = (x [i] - c [i + 1] * x_ - l [i] * x2) / d [i] # 5 flops
end
time()-t0, t0-st
end
#forward solve
function lsolve(local_ref, st)
(Da, Db, Dc, Dd, Dl, Dx) = fetch(local_ref)
lsolve(Da, Db, Dc, Dd, Dl, Dx, st)
end
function lsolve(Da, Db, Dc, Dd, Dl, Dx, st)
t0 = time()
a = localpart(Da)
b = localpart(Db)
c = localpart(Dc)
d = localpart(Dd)
l = localpart(Dl)
x = localpart(Dx)
n = size(x,1)
d_ = d [1] = a [1]
x_ = x [1] = b [1]
f = c [1]
l [1] = f
r = f / d_
s1 = f * r
x1 = x_ * r
for i = 2 : n - 1
c_ = c [i]
l_ = c_ / d_ # 1
d_ = a [i] - c_ * l_ # 2
x_ = b [i] - x_ * l_ # 2
f *= -l_ # 1 excluding negation for fillin
r = f / d_ # 1
s1 += f * r # 2
x1 += x_ * r # 2
l [i] = f # store fillin
d [i] = d_
x [i] = x_
end
l_ = c [n] / d_
d [n] = a [n] - c [n] * l_
x [n] = b [n] - x_ * l_
fp = - r * c [n] # fillin between previous and next separators
s1, x1, fp, d [n], x[n], time()-t0, t0-st
end
const PRELOAD_DARRAYS=1
const SEND_DARRAYS=2
function nd_p_s(Da, Db, Dc, Dd, Dl, Dx, procs, mode, local_refs)
np = size(procs,1) # no of processors
delays = []
assert (np > 0)
rf = Array(Any, np)
t0 = time()
for i = 1:np
p = procs [i]
if mode == SEND_DARRAYS
rf[i] = @spawnat p lsolve(Da, Db, Dc, Dd, Dl, Dx, t0)
else
rf[i] = @spawnat p lsolve(local_refs[i], t0)
end
end
a = zeros (np)
c = zeros (np -1)
x = zeros (np)
ct = 0
(s1, x1, f, a[1], x[1], ct, delay) = fetch(rf [1])
push!(delays, [1, delay, 0])
for i = 2:np
(s1, x1, c [i - 1], a[i], x[i], ctw, delay) = fetch(rf [i])
a [i - 1] -= s1
x [i - 1] -= x1
ct += ctw
push!(delays, [i, delay, 0])
end
mylu(a, x, c)
p = procs [1]
t0 = time()
for i = 1:np
xp = i==1 ? 0 : x[i-1]
p = procs [i]
if mode == SEND_DARRAYS
rf[i] = @spawnat p usolve(Dc, Dd, Dl, Dx, x[i], xp, t0)
else
rf[i] = @spawnat p usolve(local_refs[i], x[i], xp, t0)
end
end
for (i, r) in enumerate(rf)
(ctw,delay) = fetch(r)
ct += ctw
delays[i][3] = delay
end
ct, delays
end
function f1(dims, procs) # auxiliary function to partition all vectors but c
n = dims[1]
P = size(procs, 1)
leaf_size::Int64 = max(floor(n / P), 2) #size of a leaf
cuts = ones(Int, P + 1) * leaf_size
cuts [1] = 1
cuts = cumsum(cuts)
cuts [end] = n + 1
indexes = Array(Any, P)
for i = 1:P
indexes [i] = (cuts [i] : cuts [i+1] - 1,)
end
indexes, Any[cuts]
end
function f2(dims, procs) # auxiliary function to partition c
n = dims[1]
P = size(procs, 1)
leaf_size::Int64 = max(floor((n - 1) / P), 2) #size of a leaf
cuts = ones(Int, P + 1) * leaf_size
cuts [1] = 1
cuts = cumsum(cuts)
cuts [end] = n + 1
indexes = Array(Any, P)
for i = 1:P
indexes [i] = (cuts [i] : cuts [i+1] - 1,)
end
indexes, Any[cuts]
end
function test_nd(n, procs::Array{Int64, 1}, mode)
P = size(procs, 1) # # of processors
Da = DArray(I->100 * rand(map(length,I)), (n,), procs, P, f1)
Db = DArray(I->rand(map(length,I)), (n,), procs, P, f1)
Dc = DArray(I->rand(map(length,I)), (n + 1,), procs, P, f2)
Dx = DArray(I->zeros(map(length,I)), (n,), procs, P, f1)
Dd = DArray(I->zeros(map(length,I)), (n,), procs, P, f1)
Dl = DArray(I->zeros(map(length,I)), (n,), procs, P, f1)
local_refs=[]
if mode == PRELOAD_DARRAYS
for p in procs
r = RemoteRef(p)
put!(r, (Da, Db, Dc, Dd, Dl, Dx))
push!(local_refs, r)
end
end
elapsed_time = 0
computational_time = 0
delays=[]
elapsed_time = @elapsed begin
computational_time, delays = nd_p_s(Da, Db, Dc, Dd, Dl, Dx, procs, mode, local_refs)
end
pcomp = (computational_time * 100) / (elapsed_time * length(procs))
lsolve_delays = map(x->x[2], delays)
usolve_delays = map(x->x[3], delays)
lsolve_strs = map(x->@sprintf("%.4f", x), [minimum(lsolve_delays), maximum(lsolve_delays), mean(lsolve_delays)])
usolve_strs = map(x->@sprintf("%.4f", x), [minimum(usolve_delays), maximum(usolve_delays), mean(usolve_delays)])
# lsolve_strs = map(x->@sprintf("%.4f", x), [mean(lsolve_delays)])
# usolve_strs = map(x->@sprintf("%.4f", x), [mean(usolve_delays)])
string(" | ", mode == PRELOAD_DARRAYS ? "PRELOAD" : "SEND", " ", @sprintf("%.2f", elapsed_time), "s ", @sprintf("%.2f", pcomp), "%") *
string(" [", join(lsolve_strs, " "), "], [") *
string(join(usolve_strs, " "), "]")
end
function launch_n_procs(n)
r = n==1 ? n : linrange(0, Sys.CPU_CORES-1, n)
for i in r
p = Int(round(i))
addprocs(1; exename=`taskset -c $p $(joinpath(JULIA_HOME, Base.julia_exename()))`)
end
end
if myid() == 1
nw = parse(Int, ARGS[1])
n = parse(Int, ARGS[2])
if length(ARGS) > 2
stp = parse(Int, ARGS[3]) * -1
else
stp = -4
end
# addprocs / rmprocs for each run to work around the memory leaks in DArrays
println("n np | mode elapsed %comp [lsolve_delays usolve_delays] | mode elapsed %comp [lsolve_delays usolve_delays] ")
for i in nw:stp:1
launch_n_procs(i)
for p in workers()
@spawnat p include(@__FILE__)
end
test_nd(10^8, workers(), SEND_DARRAYS) # compilation run
send_result = test_nd(10^n, workers(), SEND_DARRAYS)
println(10^n, " ", length(workers()), send_result)
rmprocs(workers(); waitfor=1.0)
# launch_n_procs(i)
# for p in workers()
# @spawnat p include(@__FILE__)
# end
# test_nd(10^8, workers(), PRELOAD_DARRAYS) # compilation run
# preload_result = test_nd(10^n, workers(), PRELOAD_DARRAYS)
# println(10^n, " ", length(workers()), send_result, preload_result)
#
# rmprocs(workers(); waitfor=1.0)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment