Created
March 25, 2015 09:03
-
-
Save amitmurthy/b6d1e8e43742f58042ad to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type DArray{T,N,A} <: AbstractArray{T,N} | |
dims::NTuple{N,Int} | |
chunks::Array{RemoteRef,N} | |
pmap::Array{Int,N} # pmap[i]==p ⇒ processor p has piece i | |
indexes::Array{NTuple{N,UnitRange{Int}},N} # indexes held by piece i | |
cuts::Vector{Vector{Int}} # cuts[d][i] = first index of chunk i in dimension d | |
function DArray(dims, chunks, pmap, indexes, cuts) | |
# check invariants | |
if size(chunks) != size(indexes) | |
throw(ArgumentError("size(chunks) != size(indexes), $(repr(chunks)) != $(repr(indexes))")) | |
elseif length(chunks) != length(pmap) | |
throw(ArgumentError("length(chunks) != length(pmap), $(length(chunks)) != $(length(pmap))")) | |
elseif dims != map(last, last(indexes)) | |
throw(ArgumentError("dimension of DArray (dim) and indexes do not match")) | |
end | |
return new(dims, chunks, reshape(pmap, size(chunks)), indexes, cuts) | |
end | |
end | |
typealias SubDArray{T,N,D<:DArray} SubArray{T,N,D} | |
typealias SubOrDArray{T,N} Union(DArray{T,N}, SubDArray{T,N}) | |
## core constructors ## | |
function DArray(init, dims, procs, dist) | |
# dist == size(chunks) | |
np = prod(dist) | |
procs = procs[1:np] | |
idxs, cuts = chunk_idxs([dims...], dist) | |
chunks = Array(RemoteRef, dist...) | |
for i = 1:np | |
chunks[i] = remotecall(procs[i], init, idxs[i]) | |
end | |
p = max(1, localpartindex(procs)) | |
A = remotecall_fetch(procs[p], r->typeof(fetch(r)), chunks[p]) | |
return DArray{eltype(A),length(dims),A}(dims, chunks, procs, idxs, cuts) | |
end | |
function DArray(init, dims, procs) | |
if isempty(procs) | |
throw(ArgumentError("no processors given")) | |
end | |
return DArray(init, dims, procs, defaultdist(dims, procs)) | |
end | |
DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(), maximum(dims))]) | |
# new DArray similar to an existing one | |
DArray(init, d::DArray) = DArray(init, size(d), procs(d), [size(d.chunks)...]) | |
macro DArray(ex::Expr) | |
if ex.head !== :comprehension | |
throw(ArgumentError("invalid @DArray syntax")) | |
end | |
ex.args[1] = esc(ex.args[1]) | |
ndim = length(ex.args) - 1 | |
ranges = map(r->esc(r.args[2]), ex.args[2:end]) | |
for d = 1:ndim | |
var = ex.args[d+1].args[1] | |
ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] ) | |
end | |
return :( DArray((I::(UnitRange{Int}...))->($ex), | |
tuple($(map(r->:(length($r)), ranges)...))) ) | |
end | |
Base.similar(d::DArray, T, dims::Dims) = DArray(I->Array(T, map(length,I)), dims, procs(d)) | |
Base.similar(d::DArray, T) = similar(d, T, size(d)) | |
Base.similar{T}(d::DArray{T}, dims::Dims) = similar(d, T, dims) | |
Base.similar{T}(d::DArray{T}) = similar(d, T, size(d)) | |
Base.size(d::DArray) = d.dims | |
@doc """ | |
### procs(d::DArray) | |
Get the vector of processes storing pieces of DArray `d`. | |
""" -> | |
Base.procs(d::DArray) = d.pmap | |
chunktype{T,N,A}(d::DArray{T,N,A}) = A | |
## chunk index utilities ## | |
# decide how to divide each dimension | |
# returns size of chunks array | |
function defaultdist(dims, procs) | |
dims = [dims...] | |
chunks = ones(Int, length(dims)) | |
np = length(procs) | |
f = sort!(collect(keys(factor(np))), rev=true) | |
k = 1 | |
while np > 1 | |
# repeatedly allocate largest factor to largest dim | |
if np % f[k] != 0 | |
k += 1 | |
if k > length(f) | |
break | |
end | |
end | |
fac = f[k] | |
(d, dno) = findmax(dims) | |
# resolve ties to highest dim | |
dno = last(find(dims .== d)) | |
if dims[dno] >= fac | |
dims[dno] = div(dims[dno], fac) | |
chunks[dno] *= fac | |
end | |
np = div(np, fac) | |
end | |
return chunks | |
end | |
# get array of start indexes for dividing sz into nc chunks | |
function defaultdist(sz::Int, nc::Int) | |
if sz >= nc | |
return round(Int, linspace(1, sz+1, nc+1)) | |
else | |
return [[1:(sz+1)], zeros(Int, nc-sz)] | |
end | |
end | |
# compute indexes array for dividing dims into chunks | |
function chunk_idxs(dims, chunks) | |
cuts = map(defaultdist, dims, chunks) | |
n = length(dims) | |
idxs = Array(NTuple{n,UnitRange{Int}},chunks...) | |
cartesianmap(tuple(chunks...)) do cidx... | |
idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1)) | |
end | |
return (idxs, cuts) | |
end | |
function localpartindex(pmap::Array{Int}) | |
mi = myid() | |
for i = 1:length(pmap) | |
if pmap[i] == mi | |
return i | |
end | |
end | |
return 0 | |
end | |
localpartindex(d::DArray) = localpartindex(d.pmap) | |
@doc """ | |
### localpart(d) | |
Get the local piece of a distributed array. | |
Returns an empty array if no local part exists on the calling process. | |
""" -> | |
function localpart{T,N,A}(d::DArray{T,N,A}) | |
lpidx = localpartindex(d) | |
if lpidx == 0 | |
return convert(A, Array(T, ntuple(N, i->0)))::A | |
end | |
return fetch(d.chunks[lpidx])::A | |
end | |
@doc """ | |
### localindexes(d) | |
A tuple describing the indexes owned by the local process. | |
Returns a tuple with empty ranges if no local part exists on the calling process. | |
""" -> | |
function localindexes(d::DArray) | |
lpidx = localpartindex(d) | |
if lpidx == 0 | |
return ntuple(ndims(d), i->1:0) | |
end | |
return d.indexes[lpidx] | |
end | |
# find which piece holds index (I...) | |
locate(d::DArray, I::Int...) = | |
ntuple(ndims(d), i->searchsortedlast(d.cuts[i], I[i])) | |
chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(d.chunks[i...])::A | |
## convenience constructors ## | |
@doc """ | |
### dzeros(dims, ...) | |
Construct a distributed array of zeros. | |
Trailing arguments are the same as those accepted by `DArray`. | |
""" -> | |
dzeros(args...) = DArray(I->zeros(map(length,I)), args...) | |
dzeros(d::Int...) = dzeros(d) | |
@doc """ | |
### dzeros(dims, ...) | |
Construct a distributed array of ones. | |
Trailing arguments are the same as those accepted by `DArray`. | |
""" -> | |
dones(args...) = DArray(I->ones(map(length,I)), args...) | |
dones(d::Int...) = dones(d) | |
@doc """ | |
### dfill(x, dims, ...) | |
Construct a distributed array filled with value `x`. | |
Trailing arguments are the same as those accepted by `DArray`. | |
""" -> | |
dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...) | |
dfill(v, d::Int...) = dfill(v, d) | |
@doc """ | |
### drand(dims, ...) | |
Construct a distributed uniform random array. | |
Trailing arguments are the same as those accepted by `DArray`. | |
""" -> | |
drand(args...) = DArray(I->rand(map(length,I)), args...) | |
drand(d::Int...) = drand(d) | |
@doc """ | |
### drandn(dims, ...) | |
Construct a distributed normal random array. | |
Trailing arguments are the same as those accepted by `DArray`. | |
""" -> | |
drandn(args...) = DArray(I->randn(map(length,I)), args...) | |
drandn(d::Int...) = drandn(d) | |
## conversions ## | |
@doc """ | |
### distribute(A[, procs]) | |
Convert a local array to distributed. | |
`procs` optionally specifies a vector of process IDs to use. (defaults to all workers) | |
""" -> | |
function distribute(A::AbstractArray; | |
procs=workers()[1:min(nworkers(), maximum(size(A)))]) | |
owner = myid() | |
rr = RemoteRef() | |
put!(rr, A) | |
d = DArray(size(A), procs) do I | |
remotecall_fetch(owner, ()->fetch(rr)[I...]) | |
end | |
# Ensure that all workers have fetched their localparts. | |
# Else a gc in between can recover the RemoteRef rr | |
for chunk in d.chunks | |
wait(chunk) | |
end | |
return d | |
end | |
Base.convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N}) = begin | |
a = Array(S, size(d)) | |
@sync begin | |
for i = 1:length(d.chunks) | |
@async a[d.indexes[i]...] = chunk(d, i) | |
end | |
end | |
return a | |
end | |
Base.convert{S,T,N}(::Type{Array{S,N}}, s::SubDArray{T,N}) = begin | |
I = s.indexes | |
d = s.parent | |
if isa(I,(UnitRange{Int}...)) && S<:T && T<:S | |
l = locate(d, map(first, I)...) | |
if isequal(d.indexes[l...], I) | |
# SubDArray corresponds to a chunk | |
return chunk(d, l...) | |
end | |
end | |
a = Array(S, size(s)) | |
a[[1:size(a,i) for i=1:N]...] = s | |
return a | |
end | |
Base.reshape{T,S<:Array}(A::DArray{T,1,S}, d::Dims) = begin | |
if prod(d) != length(A) | |
throw(DimensionMismatch("dimensions must be consistent with array size")) | |
end | |
return DArray(d) do I | |
sz = map(length,I) | |
d1offs = first(I[1]) | |
nd = length(I) | |
B = Array(T,sz) | |
nr = size(B,1) | |
sztail = size(B)[2:end] | |
for i=1:div(length(B),nr) | |
i2 = ind2sub(sztail, i) | |
globalidx = [ I[j][i2[j-1]] for j=2:nd ] | |
a = sub2ind(d, d1offs, globalidx...) | |
B[:,i] = A[a:(a+nr-1)] | |
end | |
B | |
end | |
end | |
## indexing ## | |
Base.getindex(r::RemoteRef, args...) = begin | |
if r.where == myid() | |
return getindex(fetch(r), args...) | |
end | |
return remotecall_fetch(r.where, getindex, r, args...) | |
end | |
function getindex_tuple{T}(d::DArray{T}, I::(Int...)) | |
chidx = locate(d, I...) | |
chunk = d.chunks[chidx...] | |
idxs = d.indexes[chidx...] | |
localidx = ntuple(ndims(d), i->(I[i]-first(idxs[i])+1)) | |
return chunk[localidx...]::T | |
end | |
Base.getindex(d::DArray, i::Int) = getindex_tuple(d, ind2sub(size(d), i)) | |
Base.getindex(d::DArray, i::Int...) = getindex_tuple(d, i) | |
Base.getindex(d::DArray) = d[1] | |
Base.getindex(d::DArray, I::Union(Int,UnitRange{Int})...) = sub(d, I...) | |
Base.copy!(dest::SubOrDArray, src::SubOrDArray) = begin | |
if !(dest.dims == src.dims && | |
dest.pmap == src.pmap && | |
dest.indexes == src.indexes && | |
dest.cuts == src.cuts) | |
throw(DimensionMismatch("destination array doesn't fit to source array")) | |
end | |
@sync for p in dest.pmap | |
@spawnat p copy!(localpart(dest), localpart(src)) | |
end | |
return dest | |
end | |
# local copies are obtained by convert(Array, ) or assigning from | |
# a SubDArray to a local Array. | |
Base.setindex!(a::Array, d::DArray, I::UnitRange{Int}...) = begin | |
n = length(I) | |
@sync for i = 1:length(d.chunks) | |
K = d.indexes[i] | |
@async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i) | |
end | |
return a | |
end | |
Base.setindex!(a::Array, s::SubDArray, I::UnitRange{Int}...) = begin | |
n = length(I) | |
d = s.parent | |
J = s.indexes | |
if length(J) < n | |
a[I...] = convert(Array,s) | |
return a | |
end | |
offs = [isa(J[i],Int) ? J[i]-1 : first(J[i])-1 for i=1:n] | |
@sync for i = 1:length(d.chunks) | |
K_c = Any[d.indexes[i]...] | |
K = [ intersect(J[j],K_c[j]) for j=1:n ] | |
if !any(isempty, K) | |
idxs = [ I[j][K[j]-offs[j]] for j=1:n ] | |
if isequal(K, K_c) | |
# whole chunk | |
@async a[idxs...] = chunk(d, i) | |
else | |
# partial chunk | |
ch = d.chunks[i] | |
@async a[idxs...] = | |
remotecall_fetch(ch.where, | |
()->sub(fetch(ch), | |
[K[j]-first(K_c[j])+1 for j=1:n]...)) | |
end | |
end | |
end | |
return a | |
end | |
# to disambiguate | |
Base.setindex!(a::Array{Any}, d::SubOrDArray, i::Int) = arrayset(a, d, i) | |
Base.setindex!(a::Array, d::SubOrDArray, I::Union(Int,UnitRange{Int})...) = | |
setindex!(a, d, [isa(i, Int) ? (i:i) : i for i in I ]...) | |
Base.fill!(A::DArray, x) = begin | |
@sync for p in procs(A) | |
@spawnat p fill!(localpart(A), x) | |
end | |
return A | |
end | |
## higher-order functions ## | |
Base.map(f, d::DArray) = DArray(I->map(f, localpart(d)), d) | |
Base.reduce(f, d::DArray) = | |
mapreduce(fetch, f, | |
Any[ @spawnat p reduce(f, localpart(d)) for p in procs(d) ]) | |
Base.mapreduce(f, opt::Function, d::DArray) = | |
mapreduce(fetch, opt, | |
Any[ @spawnat p mapreduce(f, opt, localpart(d)) for p in procs(d) ]) | |
Base.map!(f, d::DArray) = begin | |
@sync for p in procs(d) | |
@spawnat p map!(f, localpart(d)) | |
end | |
return d | |
end | |
# mapreducedim | |
Base.reducedim_initarray{R}(A::DArray, region, v0, ::Type{R}) = begin | |
procsgrid = reshape(procs(A), size(A.indexes)) | |
gridsize = Base.reduced_dims(size(A.indexes), region) | |
procsgrid = procsgrid[UnitRange{Int}[1:n for n = gridsize]...] | |
return dfill(convert(R, v0), Base.reduced_dims(A, region), procsgrid, gridsize) | |
end | |
Base.reducedim_initarray{T}(A::DArray, region, v0::T) = Base.reducedim_initarray(A, region, v0, T) | |
Base.reducedim_initarray0{R}(A::DArray, region, v0, ::Type{R}) = begin | |
procsgrid = reshape(procs(A), size(A.indexes)) | |
gridsize = Base.reduced_dims0(size(A.indexes), region) | |
procsgrid = procsgrid[UnitRange{Int}[1:n for n = gridsize]...] | |
return dfill(convert(R, v0), Base.reduced_dims0(A, region), procsgrid, gridsize) | |
end | |
Base.reducedim_initarray0{T}(A::DArray, region, v0::T) = Base.reducedim_initarray0(A, region, v0, T) | |
mapreducedim_within(f, op, A::DArray, region) = begin | |
arraysize = [size(A)...] | |
gridsize = [size(A.indexes)...] | |
arraysize[[region...]] = gridsize[[region...]] | |
return DArray(tuple(arraysize...), procs(A), tuple(gridsize...)) do I | |
mapreducedim(f, op, localpart(A), region) | |
end | |
end | |
function mapreducedim_between!(f, op, R::DArray, A::DArray, region) | |
@sync for p in procs(R) | |
@spawnat p begin | |
localind = [r for r = localindexes(A)] | |
localind[[region...]] = [1:n for n = size(A)[[region...]]] | |
B = convert(Array, A[localind...]) | |
Base.mapreducedim!(f, op, localpart(R), B) | |
end | |
end | |
return R | |
end | |
Base.mapreducedim!(f, op, R::DArray, A::DArray) = begin | |
lsize = Base.check_reducedims(R,A) | |
if isempty(A) | |
return copy(R) | |
end | |
region = tuple(collect(1:ndims(A))[[size(R)...] .!= [size(A)...]]...) | |
if isempty(region) | |
return copy!(R, A) | |
end | |
B = mapreducedim_within(f, op, A, region) | |
return mapreducedim_between!(identity, op, R, B, region) | |
end | |
Base.mapreducedim(f, op, R::DArray, A::DArray) = begin | |
Base.mapreducedim!(f, op, Base.reducedim_initarray(A, region, v0), A) | |
end | |
# LinAlg | |
Base.scale!(A::DArray, x::Number) = begin | |
@sync for p in procs(A) | |
@spawnat p scale!(localpart(A), x) | |
end | |
return A | |
end | |
function DArray(init, dims, procs, dist, distfunc::Function) | |
np = prod(dist) | |
procs = procs[1:np] | |
idxs, cuts = distfunc([dims...], procs) | |
chunks = Array(RemoteRef, dist...) | |
for i = 1:np | |
chunks[i] = remotecall(procs[i], init, idxs[i]) | |
end | |
p = max(1, localpartindex(procs)) | |
A = remotecall_fetch(procs[p], r->typeof(fetch(r)), chunks[p]) | |
DArray{eltype(A),length(dims),A}(dims, chunks, procs, idxs, cuts) | |
end | |
function mylu(a, b, c) | |
n = size(a,1) | |
for i = 2:n | |
l = c [i - 1] / a [i - 1] | |
a [i] -= c [i - 1] * l | |
b [i] -= l * b [i - 1] | |
end | |
b [n] /= a [n] | |
for i = n - 1 : -1 : 1 | |
b [i] = (b [i] - c [i] * b [i + 1]) / a [i] | |
end | |
end | |
#backsolve | |
function usolve(local_ref, x1, x2, st) | |
(Da, Db, Dc, Dd, Dl, Dx) = fetch(local_ref) | |
usolve(Dc, Dd, Dl, Dx, x1, x2, st) | |
end | |
function usolve(Dc, Dd, Dl, Dx, x1, x2, st) | |
t0 = time() | |
c = localpart(Dc) | |
d = localpart(Dd) | |
l = localpart(Dl) | |
x = localpart(Dx) | |
n = size(x, 1) | |
#x1, x2 are solutions at the next and previous separators | |
x_ = x [n] = x1 | |
for i = n - 1 : -1 : 1 | |
x_ = x [i] = (x [i] - c [i + 1] * x_ - l [i] * x2) / d [i] # 5 flops | |
end | |
time()-t0, t0-st | |
end | |
#forward solve | |
function lsolve(local_ref, st) | |
(Da, Db, Dc, Dd, Dl, Dx) = fetch(local_ref) | |
lsolve(Da, Db, Dc, Dd, Dl, Dx, st) | |
end | |
function lsolve(Da, Db, Dc, Dd, Dl, Dx, st) | |
t0 = time() | |
a = localpart(Da) | |
b = localpart(Db) | |
c = localpart(Dc) | |
d = localpart(Dd) | |
l = localpart(Dl) | |
x = localpart(Dx) | |
n = size(x,1) | |
d_ = d [1] = a [1] | |
x_ = x [1] = b [1] | |
f = c [1] | |
l [1] = f | |
r = f / d_ | |
s1 = f * r | |
x1 = x_ * r | |
for i = 2 : n - 1 | |
c_ = c [i] | |
l_ = c_ / d_ # 1 | |
d_ = a [i] - c_ * l_ # 2 | |
x_ = b [i] - x_ * l_ # 2 | |
f *= -l_ # 1 excluding negation for fillin | |
r = f / d_ # 1 | |
s1 += f * r # 2 | |
x1 += x_ * r # 2 | |
l [i] = f # store fillin | |
d [i] = d_ | |
x [i] = x_ | |
end | |
l_ = c [n] / d_ | |
d [n] = a [n] - c [n] * l_ | |
x [n] = b [n] - x_ * l_ | |
fp = - r * c [n] # fillin between previous and next separators | |
s1, x1, fp, d [n], x[n], time()-t0, t0-st | |
end | |
const PRELOAD_DARRAYS=1 | |
const SEND_DARRAYS=2 | |
function nd_p_s(Da, Db, Dc, Dd, Dl, Dx, procs, mode, local_refs) | |
np = size(procs,1) # no of processors | |
delays = [] | |
assert (np > 0) | |
rf = Array(Any, np) | |
t0 = time() | |
for i = 1:np | |
p = procs [i] | |
if mode == SEND_DARRAYS | |
rf[i] = @spawnat p lsolve(Da, Db, Dc, Dd, Dl, Dx, t0) | |
else | |
rf[i] = @spawnat p lsolve(local_refs[i], t0) | |
end | |
end | |
a = zeros (np) | |
c = zeros (np -1) | |
x = zeros (np) | |
ct = 0 | |
(s1, x1, f, a[1], x[1], ct, delay) = fetch(rf [1]) | |
push!(delays, [1, delay, 0]) | |
for i = 2:np | |
(s1, x1, c [i - 1], a[i], x[i], ctw, delay) = fetch(rf [i]) | |
a [i - 1] -= s1 | |
x [i - 1] -= x1 | |
ct += ctw | |
push!(delays, [i, delay, 0]) | |
end | |
mylu(a, x, c) | |
p = procs [1] | |
t0 = time() | |
for i = 1:np | |
xp = i==1 ? 0 : x[i-1] | |
p = procs [i] | |
if mode == SEND_DARRAYS | |
rf[i] = @spawnat p usolve(Dc, Dd, Dl, Dx, x[i], xp, t0) | |
else | |
rf[i] = @spawnat p usolve(local_refs[i], x[i], xp, t0) | |
end | |
end | |
for (i, r) in enumerate(rf) | |
(ctw,delay) = fetch(r) | |
ct += ctw | |
delays[i][3] = delay | |
end | |
ct, delays | |
end | |
function f1(dims, procs) # auxiliary function to partition all vectors but c | |
n = dims[1] | |
P = size(procs, 1) | |
leaf_size::Int64 = max(floor(n / P), 2) #size of a leaf | |
cuts = ones(Int, P + 1) * leaf_size | |
cuts [1] = 1 | |
cuts = cumsum(cuts) | |
cuts [end] = n + 1 | |
indexes = Array(Any, P) | |
for i = 1:P | |
indexes [i] = (cuts [i] : cuts [i+1] - 1,) | |
end | |
indexes, Any[cuts] | |
end | |
function f2(dims, procs) # auxiliary function to partition c | |
n = dims[1] | |
P = size(procs, 1) | |
leaf_size::Int64 = max(floor((n - 1) / P), 2) #size of a leaf | |
cuts = ones(Int, P + 1) * leaf_size | |
cuts [1] = 1 | |
cuts = cumsum(cuts) | |
cuts [end] = n + 1 | |
indexes = Array(Any, P) | |
for i = 1:P | |
indexes [i] = (cuts [i] : cuts [i+1] - 1,) | |
end | |
indexes, Any[cuts] | |
end | |
function test_nd(n, procs::Array{Int64, 1}, mode) | |
P = size(procs, 1) # # of processors | |
Da = DArray(I->100 * rand(map(length,I)), (n,), procs, P, f1) | |
Db = DArray(I->rand(map(length,I)), (n,), procs, P, f1) | |
Dc = DArray(I->rand(map(length,I)), (n + 1,), procs, P, f2) | |
Dx = DArray(I->zeros(map(length,I)), (n,), procs, P, f1) | |
Dd = DArray(I->zeros(map(length,I)), (n,), procs, P, f1) | |
Dl = DArray(I->zeros(map(length,I)), (n,), procs, P, f1) | |
local_refs=[] | |
if mode == PRELOAD_DARRAYS | |
for p in procs | |
r = RemoteRef(p) | |
put!(r, (Da, Db, Dc, Dd, Dl, Dx)) | |
push!(local_refs, r) | |
end | |
end | |
elapsed_time = 0 | |
computational_time = 0 | |
delays=[] | |
elapsed_time = @elapsed begin | |
computational_time, delays = nd_p_s(Da, Db, Dc, Dd, Dl, Dx, procs, mode, local_refs) | |
end | |
pcomp = (computational_time * 100) / (elapsed_time * length(procs)) | |
lsolve_delays = map(x->x[2], delays) | |
usolve_delays = map(x->x[3], delays) | |
# lsolve_strs = map(x->@sprintf("%.4f", x), [minimum(lsolve_delays), maximum(lsolve_delays), mean(lsolve_delays)]) | |
# usolve_strs = map(x->@sprintf("%.4f", x), [minimum(usolve_delays), maximum(usolve_delays), mean(usolve_delays)]) | |
lsolve_strs = map(x->@sprintf("%.4f", x), [mean(lsolve_delays)]) | |
usolve_strs = map(x->@sprintf("%.4f", x), [mean(usolve_delays)]) | |
string(" | ", mode == PRELOAD_DARRAYS ? "PRELOAD" : "SEND", " ", @sprintf("%.2f", elapsed_time), "s ", @sprintf("%.2f", pcomp), "%") * | |
string(" [", join(lsolve_strs, " "), ",") * | |
string(join(usolve_strs, " "), "]") | |
end | |
if myid() == 1 | |
nw = parse(Int, ARGS[1]) | |
n = parse(Int, ARGS[2]) | |
if length(ARGS) > 2 | |
stp = parse(Int, ARGS[3]) * -1 | |
else | |
stp = -4 | |
end | |
# addprocs / rmprocs for each run to work around the memory leaks in DArrays | |
println("n np | mode elapsed %comp [lsolve_delay_avg usolve_delay_avg] | mode elapsed %comp [lsolve_delay_avg usolve_delay_avg] ") | |
for i in nw:stp:1 | |
addprocs(i) | |
for p in workers() | |
@spawnat p include(@__FILE__) | |
end | |
test_nd(10^8, workers(), SEND_DARRAYS) # compilation run | |
send_result = test_nd(10^n, workers(), SEND_DARRAYS) | |
rmprocs(workers(); waitfor=1.0) | |
addprocs(i) | |
for p in workers() | |
@spawnat p include(@__FILE__) | |
end | |
test_nd(10^8, workers(), PRELOAD_DARRAYS) # compilation run | |
preload_result = test_nd(10^n, workers(), PRELOAD_DARRAYS) | |
println(10^n, " ", length(workers()), send_result, preload_result) | |
rmprocs(workers(); waitfor=1.0) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment