-
-
Save jpsamaroo/95c78b3361ae454a51916183f2cf346f to your computer and use it in GitHub Desktop.
using Dagger | |
using Profile | |
fibsum(a, b) = .3 .* a .+ .7 .* b | |
function compute_sn_aot(N) | |
f0 = delayed(rand)(10) | |
f1 = delayed(rand)(10) | |
s = 0. # result held here | |
even = [f0] | |
for i in 1:N | |
f2 = delayed(fibsum)(f0, f1) | |
f0, f1 = f1, f2 | |
if i%2 == 0 | |
push!(even, f2) | |
end | |
end | |
even = compute.(even) | |
collect(delayed(sum)(even)) | |
end | |
function compute_sn_jit(N) | |
f0 = Dagger.@spawn rand(10) | |
f1 = Dagger.@spawn rand(10) | |
s = 0. # result held here | |
even = [f0] | |
for i in 1:N | |
f2 = Dagger.@spawn fibsum(f0, f1) | |
f0, f1 = f1, f2 | |
if i%2 == 0 | |
push!(even, f2) | |
end | |
end | |
even = fetch.(even) | |
fetch(Dagger.@spawn sum(even)) | |
end | |
function compute_yn_aot(N) | |
f0 = delayed(rand)(10) | |
f1 = delayed(rand)(10) | |
f2 = 0. # results here | |
for i in 1:N | |
f2 = delayed(fibsum)(f0, f1) | |
f0, f1 = f1, f2 | |
end | |
return collect(f2) | |
end | |
function compute_yn_jit(N) | |
f0 = Dagger.@spawn rand(10) | |
f1 = Dagger.@spawn rand(10) | |
f2 = 0. # results here | |
for i in 1:N | |
f2 = Dagger.@spawn fibsum(f0, f1) | |
f0, f1 = f1, f2 | |
end | |
return fetch(f2) | |
end | |
# Warm-up the execution and GC code paths | |
compute_sn_aot(10) | |
compute_sn_jit(10) | |
compute_yn_aot(10) | |
compute_yn_jit(10) | |
GC.gc() | |
sleep(3) | |
# Change this to `true` to enable runtime and allocation profiling! | |
const profile = false | |
profile && Profile.init(; n=Profile.init()[1], delay=0.0001) | |
const alloc_profile = profile && VERSION >= v"1.8-" | |
if profile | |
Ns = [2_000] | |
else | |
Ns = [1_000, 5_000, 10_000, 100_000, 200_000, 500_000] | |
end | |
for N in Ns | |
print("s_n @ $N (AOT): ") | |
@time compute_sn_aot(N) | |
profile && Profile.@profile compute_sn_aot(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_aot(N) | |
print("s_n @ $N (JIT): ") | |
@time compute_sn_jit(N) | |
profile && Profile.@profile compute_sn_jit(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_jit($N) | |
print("y_n @ $N (AOT): ") | |
@time compute_yn_aot(N) | |
profile && Profile.@profile compute_yn_aot(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_aot(N) | |
print("y_n @ $N (JIT): ") | |
@time compute_yn_jit(N) | |
profile && Profile.@profile compute_yn_jit(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_jit($N) | |
end | |
using Printf | |
function print_allocs(allocs; minsize=64, raw=false) | |
allocs = sort(filter(a->a.size >= minsize, allocs.allocs); by=a->a.size) | |
if raw | |
for alloc in allocs | |
@printf "%10s [%s]\n" Base.format_bytes(alloc.size) alloc.type | |
end | |
else | |
per_type = Dict{Type,Int}() | |
for alloc in allocs | |
per_type[alloc.type] = get(per_type, alloc.type, 0) + alloc.size | |
end | |
for T_alloc in sort([per_type...]; by=x->x[2]) | |
@printf "%10s [%s]\n" Base.format_bytes(T_alloc[2]) T_alloc[1] | |
end | |
end | |
end | |
if profile | |
const results = Profile.retrieve() | |
Profile.print(results...; mincount=500, maxdepth=20, C=true) | |
end | |
if alloc_profile | |
const allocs = Profile.Allocs.fetch() | |
print_allocs(allocs) | |
end |
import dask | |
import numpy as np | |
import timeit | |
@dask.delayed | |
def fib0(n) : | |
return np.random.rand(10) | |
@dask.delayed | |
def wsum(a, b) : | |
return (.3*a + .7*b) | |
def compute_sn(N): | |
f0 = fib0(1) | |
f1 = fib0(2) | |
even = [f0] | |
for i in range(0, N) : | |
f2 = wsum(f0, f1) | |
f0, f1 = f1, f2 | |
if (i%2 == 1) : | |
even.append(f2) | |
v = dask.delayed(sum)(even) | |
v.compute() | |
def compute_yn(N): | |
f0 = fib0(1) | |
f1 = fib0(2) | |
f2 = None | |
for i in range(0, N) : | |
f2 = wsum(f0, f1) | |
f0, f1 = f1, f2 | |
f2.compute() | |
for N in [1000, 5000, 10000]: | |
print("s_n @ %d: %s" % (N, timeit.timeit(lambda: compute_sn(N), number=3))) | |
print("y_n @ %d: %s" % (N, timeit.timeit(lambda: compute_yn(N), number=3))) |
now i got a different error with julia 1.7.3 + Dagger#master:
julius@julius:~$ /usr/local/julia-1.7.3/bin/julia
_
_ _ ()_ | Documentation: https://docs.julialang.org
() | () () |
_ _ | | __ _ | Type "?" for help, "]?" for Pkg help.
| | | | | | |/ ` | |
| | || | | | (| | | Version 1.7.3 (2022-05-06)
/ |_'|||_'_| | Official https://julialang.org/ release
|__/ |
julia> using Dagger
[ Info: Precompiling Dagger [d58978e5-989f-55fb-8d15-ea34adc7bf54]
julia> N = 10
10
julia> f0 = Dagger.@Spawn rand(10)
Error in eager scheduler:
ThunkFailedException (Thunk[1](eager_thunk, Any[]) failure):
jl_set_task_tid == 0
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:33
[2] execute!(::Dagger.ThreadProc, ::Any)
@ Dagger ~/dev/Julius/Dagger/src/processor.jl:173
[3] #126
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:1058 [inlined]
[4] (::Dagger.var"#62#63"{Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}})()
@ Dagger ~/dev/Julius/Dagger/src/options.jl:7
[5] with_logstate(f::Function, logstate::Any)
@ Base.CoreLogging ./logging.jl:511
[6] with_logger(f::Function, logger::ContextVariablesX.ContextPayloadLogger)
@ Base.CoreLogging ./logging.jl:623
[7] with_task_ctxvars(f::Any, ctx::Any)
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/payloadlogger.jl:16
[8] with_context(f::Function, kvs::Pair{ContextVariablesX.ContextVar{NamedTuple}, NamedTuple{(), Tuple{}}})
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/ContextVariablesX.jl:336
[9] with_options(f::Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}, options::NamedTuple{(), Tuple{}})
@ Dagger ~/dev/Julius/Dagger/src/options.jl:6
[10] do_task(to_proc::Dagger.ThreadProc, comm::Vector{Any})
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:1056
[11] macro expansion
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:942 [inlined]
[12] (::Dagger.Sch.var"#116#118"{Dagger.ThreadProc, Distributed.RemoteChannel{Channel{Any}}, Vector{Any}})()
@ Dagger.Sch ./task.jl:429
Stacktrace:
[1] compute_dag(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:384
[2] compute(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger ~/dev/Julius/Dagger/src/compute.jl:31
[3] (::Dagger.Sch.var"#51#52"{Context})()
@ Dagger.Sch ./task.jl:429
^CERROR: InterruptException:
Stacktrace:
[1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
@ Base ./task.jl:812
[2] wait()
@ Base ./task.jl:872
[3] wait(c::Base.GenericCondition{ReentrantLock})
@ Base ./condition.jl:123
[4] fetch_buffered(c::Channel{Any})
@ Base ./channels.jl:365
[5] fetch(c::Channel{Any})
@ Base ./channels.jl:359
[6] fetch(r::Distributed.Future)
@ Distributed /usr/local/julia-1.7.3/share/julia/stdlib/v1.7/Distributed/src/remotecall.jl:620
[7] _spawn(f::Function, args::Int64; options::NamedTuple{(), Tuple{}}, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:255
[8] spawn(f::Function, args::Int64; processor::Nothing, scope::Nothing, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:275
[9] spawn(f::Function, args::Int64)
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:268
[10] top-level scope
@ ~/dev/Julius/Dagger/src/thunk.jl:334
@juliusrules can you get me a stacktrace of where this is stuck? I usually send a
SIGTERM
to the process (instead of Ctrl+C) when I hit a deadlock to kill it and dump the stacktrace.