-
-
Save jpsamaroo/95c78b3361ae454a51916183f2cf346f to your computer and use it in GitHub Desktop.
using Dagger | |
using Profile | |
fibsum(a, b) = .3 .* a .+ .7 .* b | |
function compute_sn_aot(N) | |
f0 = delayed(rand)(10) | |
f1 = delayed(rand)(10) | |
s = 0. # result held here | |
even = [f0] | |
for i in 1:N | |
f2 = delayed(fibsum)(f0, f1) | |
f0, f1 = f1, f2 | |
if i%2 == 0 | |
push!(even, f2) | |
end | |
end | |
even = compute.(even) | |
collect(delayed(sum)(even)) | |
end | |
function compute_sn_jit(N) | |
f0 = Dagger.@spawn rand(10) | |
f1 = Dagger.@spawn rand(10) | |
s = 0. # result held here | |
even = [f0] | |
for i in 1:N | |
f2 = Dagger.@spawn fibsum(f0, f1) | |
f0, f1 = f1, f2 | |
if i%2 == 0 | |
push!(even, f2) | |
end | |
end | |
even = fetch.(even) | |
fetch(Dagger.@spawn sum(even)) | |
end | |
function compute_yn_aot(N) | |
f0 = delayed(rand)(10) | |
f1 = delayed(rand)(10) | |
f2 = 0. # results here | |
for i in 1:N | |
f2 = delayed(fibsum)(f0, f1) | |
f0, f1 = f1, f2 | |
end | |
return collect(f2) | |
end | |
function compute_yn_jit(N) | |
f0 = Dagger.@spawn rand(10) | |
f1 = Dagger.@spawn rand(10) | |
f2 = 0. # results here | |
for i in 1:N | |
f2 = Dagger.@spawn fibsum(f0, f1) | |
f0, f1 = f1, f2 | |
end | |
return fetch(f2) | |
end | |
# Warm-up the execution and GC code paths | |
compute_sn_aot(10) | |
compute_sn_jit(10) | |
compute_yn_aot(10) | |
compute_yn_jit(10) | |
GC.gc() | |
sleep(3) | |
# Change this to `true` to enable runtime and allocation profiling! | |
const profile = false | |
profile && Profile.init(; n=Profile.init()[1], delay=0.0001) | |
const alloc_profile = profile && VERSION >= v"1.8-" | |
if profile | |
Ns = [2_000] | |
else | |
Ns = [1_000, 5_000, 10_000, 100_000, 200_000, 500_000] | |
end | |
for N in Ns | |
print("s_n @ $N (AOT): ") | |
@time compute_sn_aot(N) | |
profile && Profile.@profile compute_sn_aot(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_aot(N) | |
print("s_n @ $N (JIT): ") | |
@time compute_sn_jit(N) | |
profile && Profile.@profile compute_sn_jit(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_jit($N) | |
print("y_n @ $N (AOT): ") | |
@time compute_yn_aot(N) | |
profile && Profile.@profile compute_yn_aot(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_aot(N) | |
print("y_n @ $N (JIT): ") | |
@time compute_yn_jit(N) | |
profile && Profile.@profile compute_yn_jit(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_jit($N) | |
end | |
using Printf | |
function print_allocs(allocs; minsize=64, raw=false) | |
allocs = sort(filter(a->a.size >= minsize, allocs.allocs); by=a->a.size) | |
if raw | |
for alloc in allocs | |
@printf "%10s [%s]\n" Base.format_bytes(alloc.size) alloc.type | |
end | |
else | |
per_type = Dict{Type,Int}() | |
for alloc in allocs | |
per_type[alloc.type] = get(per_type, alloc.type, 0) + alloc.size | |
end | |
for T_alloc in sort([per_type...]; by=x->x[2]) | |
@printf "%10s [%s]\n" Base.format_bytes(T_alloc[2]) T_alloc[1] | |
end | |
end | |
end | |
if profile | |
const results = Profile.retrieve() | |
Profile.print(results...; mincount=500, maxdepth=20, C=true) | |
end | |
if alloc_profile | |
const allocs = Profile.Allocs.fetch() | |
print_allocs(allocs) | |
end |
import dask | |
import numpy as np | |
import timeit | |
@dask.delayed | |
def fib0(n) : | |
return np.random.rand(10) | |
@dask.delayed | |
def wsum(a, b) : | |
return (.3*a + .7*b) | |
def compute_sn(N): | |
f0 = fib0(1) | |
f1 = fib0(2) | |
even = [f0] | |
for i in range(0, N) : | |
f2 = wsum(f0, f1) | |
f0, f1 = f1, f2 | |
if (i%2 == 1) : | |
even.append(f2) | |
v = dask.delayed(sum)(even) | |
v.compute() | |
def compute_yn(N): | |
f0 = fib0(1) | |
f1 = fib0(2) | |
f2 = None | |
for i in range(0, N) : | |
f2 = wsum(f0, f1) | |
f0, f1 = f1, f2 | |
f2.compute() | |
for N in [1000, 5000, 10000]: | |
print("s_n @ %d: %s" % (N, timeit.timeit(lambda: compute_sn(N), number=3))) | |
print("y_n @ %d: %s" % (N, timeit.timeit(lambda: compute_yn(N), number=3))) |
also i'm seeing another error at the #main branch, for the following line (I'm using Julia v 1.6.6)
@time compute_yn_jit(100000)
Error in enqueued work:
schedule: Task not runnableError in enqueued work:
AssertionError: poolunref_owner called before any poolref_owner
Stacktrace:
[1] (::MemPool.var"#26#28"{Int64, Dict{Int64, Base.Threads.Atomic{Int64}}})()
@ MemPool ~/.julia/packages/MemPool/wlrUg/src/datastore.jl:173
[2] macro expansion
@ ~/.julia/packages/MemPool/wlrUg/src/lock.jl:42 [inlined]
[3] with_datastore_lock(f::MemPool.var"#26#28"{Int64, Dict{Int64, Base.Threads.Atomic{Int64}}})
@ MemPool ~/.julia/packages/MemPool/wlrUg/src/datastore.jl:218
[4] poolunref_owner
@ ~/.julia/packages/MemPool/wlrUg/src/datastore.jl:172 [inlined]
[5] #137
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:354 [inlined]
[6] run_work_thunk(thunk::Distributed.var"#137#138"{typeof(MemPool.poolunref_owner), Tuple{Int64, Dict{Int64, Base.Threads.Atomic{Int64}}}, Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}}, print_error::Bool)
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:63
[7] run_work_thunk
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:72 [inlined]
[8] (::Distributed.var"#96#98"{Distributed.RemoteValue, Distributed.var"#137#138"{typeof(MemPool.poolunref_owner), Tuple{Int64, Dict{Int64, Base.Threads.Atomic{Int64}}}, Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}}})()
@ Distributed ./task.jl:411
Odd, can you tell me which versions of Julia and MemPool.jl you're using here?
those errors where using julia 1.6.6 and MemPool v0.3.9, I also tried to use julia 1.7.2, and somehow it seems to deadlock for me for example the following code stuck and not return for me under julia v.1.7.2:
N = 10
f0 = Dagger.@spawn rand(10)
f1 = Dagger.@spawn rand(10)
@time begin
even = [f0]
for i in 1:N
f2 = Dagger.@spawn fibsum(f0, f1)
f0, f1 = f1, f2
if i%2 == 0
push!(even, f2)
end
end
even = fetch.(even)
fetch(Dagger.@spawn sum(even))
end
@juliusrules can you get me a stacktrace of where this is stuck? I usually send a SIGTERM
to the process (instead of Ctrl+C) when I hit a deadlock to kill it and dump the stacktrace.
now i got a different error with julia 1.7.3 + Dagger#master:
julius@julius:~$ /usr/local/julia-1.7.3/bin/julia
_
_ _ ()_ | Documentation: https://docs.julialang.org
() | () () |
_ _ | | __ _ | Type "?" for help, "]?" for Pkg help.
| | | | | | |/ ` | |
| | || | | | (| | | Version 1.7.3 (2022-05-06)
/ |_'|||_'_| | Official https://julialang.org/ release
|__/ |
julia> using Dagger
[ Info: Precompiling Dagger [d58978e5-989f-55fb-8d15-ea34adc7bf54]
julia> N = 10
10
julia> f0 = Dagger.@Spawn rand(10)
Error in eager scheduler:
ThunkFailedException (Thunk[1](eager_thunk, Any[]) failure):
jl_set_task_tid == 0
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:33
[2] execute!(::Dagger.ThreadProc, ::Any)
@ Dagger ~/dev/Julius/Dagger/src/processor.jl:173
[3] #126
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:1058 [inlined]
[4] (::Dagger.var"#62#63"{Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}})()
@ Dagger ~/dev/Julius/Dagger/src/options.jl:7
[5] with_logstate(f::Function, logstate::Any)
@ Base.CoreLogging ./logging.jl:511
[6] with_logger(f::Function, logger::ContextVariablesX.ContextPayloadLogger)
@ Base.CoreLogging ./logging.jl:623
[7] with_task_ctxvars(f::Any, ctx::Any)
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/payloadlogger.jl:16
[8] with_context(f::Function, kvs::Pair{ContextVariablesX.ContextVar{NamedTuple}, NamedTuple{(), Tuple{}}})
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/ContextVariablesX.jl:336
[9] with_options(f::Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}, options::NamedTuple{(), Tuple{}})
@ Dagger ~/dev/Julius/Dagger/src/options.jl:6
[10] do_task(to_proc::Dagger.ThreadProc, comm::Vector{Any})
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:1056
[11] macro expansion
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:942 [inlined]
[12] (::Dagger.Sch.var"#116#118"{Dagger.ThreadProc, Distributed.RemoteChannel{Channel{Any}}, Vector{Any}})()
@ Dagger.Sch ./task.jl:429
Stacktrace:
[1] compute_dag(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:384
[2] compute(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger ~/dev/Julius/Dagger/src/compute.jl:31
[3] (::Dagger.Sch.var"#51#52"{Context})()
@ Dagger.Sch ./task.jl:429
^CERROR: InterruptException:
Stacktrace:
[1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
@ Base ./task.jl:812
[2] wait()
@ Base ./task.jl:872
[3] wait(c::Base.GenericCondition{ReentrantLock})
@ Base ./condition.jl:123
[4] fetch_buffered(c::Channel{Any})
@ Base ./channels.jl:365
[5] fetch(c::Channel{Any})
@ Base ./channels.jl:359
[6] fetch(r::Distributed.Future)
@ Distributed /usr/local/julia-1.7.3/share/julia/stdlib/v1.7/Distributed/src/remotecall.jl:620
[7] _spawn(f::Function, args::Int64; options::NamedTuple{(), Tuple{}}, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:255
[8] spawn(f::Function, args::Int64; processor::Nothing, scope::Nothing, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:275
[9] spawn(f::Function, args::Int64)
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:268
[10] top-level scope
@ ~/dev/Julius/Dagger/src/thunk.jl:334
am getting this error running the #main branch of Dagger, from:
for smaller n it worked fine and is a lot faster!
error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer")
jl_error at /buildworker/worker/package_linux64/build/src/rtutils.c:41
jl_switch at /buildworker/worker/package_linux64/build/src/task.c:501
try_yieldto at ./task.jl:700
wait at ./task.jl:770
wait at ./condition.jl:106
lock at ./lock.jl:100
lock at ./lock.jl:185
lock at ./weakkeydict.jl:87 [inlined]
delete! at ./weakkeydict.jl:166 [inlined]
finalize_ref at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:92
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
run_finalizer at /buildworker/worker/package_linux64/build/src/gc.c:278
jl_gc_run_finalizers_in_list at /buildworker/worker/package_linux64/build/src/gc.c:365
run_finalizers at /buildworker/worker/package_linux64/build/src/gc.c:394
jl_gc_collect at /buildworker/worker/package_linux64/build/src/gc.c:3259
maybe_collect at /buildworker/worker/package_linux64/build/src/gc.c:879 [inlined]
jl_gc_pool_alloc at /buildworker/worker/package_linux64/build/src/gc.c:1203
indexed_iterate at ./tuple.jl:86 [inlined]
indexed_iterate at ./tuple.jl:86
unknown function (ip: 0x7f41cb583308)
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
macro expansion at /home/julius/.julia/packages/MemPool/wlrUg/src/datastore.jl:100 [inlined]
#16 at ./task.jl:411
unknown function (ip: 0x7f41cb58318c)
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
start_task at /buildworker/worker/package_linux64/build/src/task.c:833
Error in enqueued work:
schedule: Task not runnable