Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Julius Graph Benchmarks: Dagger and Dask
using Dagger
using Profile
fibsum(a, b) = .3 .* a .+ .7 .* b
function compute_sn_aot(N)
f0 = delayed(rand)(10)
f1 = delayed(rand)(10)
s = 0. # result held here
even = [f0]
for i in 1:N
f2 = delayed(fibsum)(f0, f1)
f0, f1 = f1, f2
if i%2 == 0
push!(even, f2)
end
end
even = compute.(even)
collect(delayed(sum)(even))
end
function compute_sn_jit(N)
f0 = Dagger.@spawn rand(10)
f1 = Dagger.@spawn rand(10)
s = 0. # result held here
even = [f0]
for i in 1:N
f2 = Dagger.@spawn fibsum(f0, f1)
f0, f1 = f1, f2
if i%2 == 0
push!(even, f2)
end
end
even = fetch.(even)
fetch(Dagger.@spawn sum(even))
end
function compute_yn_aot(N)
f0 = delayed(rand)(10)
f1 = delayed(rand)(10)
f2 = 0. # results here
for i in 1:N
f2 = delayed(fibsum)(f0, f1)
f0, f1 = f1, f2
end
return collect(f2)
end
function compute_yn_jit(N)
f0 = Dagger.@spawn rand(10)
f1 = Dagger.@spawn rand(10)
f2 = 0. # results here
for i in 1:N
f2 = Dagger.@spawn fibsum(f0, f1)
f0, f1 = f1, f2
end
return fetch(f2)
end
# Warm-up the execution and GC code paths
compute_sn_aot(10)
compute_sn_jit(10)
compute_yn_aot(10)
compute_yn_jit(10)
GC.gc()
sleep(3)
# Change this to `true` to enable runtime and allocation profiling!
const profile = false
profile && Profile.init(; n=Profile.init()[1], delay=0.0001)
const alloc_profile = profile && VERSION >= v"1.8-"
if profile
Ns = [2_000]
else
Ns = [1_000, 5_000, 10_000, 100_000, 200_000, 500_000]
end
for N in Ns
print("s_n @ $N (AOT): ")
@time compute_sn_aot(N)
profile && Profile.@profile compute_sn_aot(N)
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_aot(N)
print("s_n @ $N (JIT): ")
@time compute_sn_jit(N)
profile && Profile.@profile compute_sn_jit(N)
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_jit($N)
print("y_n @ $N (AOT): ")
@time compute_yn_aot(N)
profile && Profile.@profile compute_yn_aot(N)
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_aot(N)
print("y_n @ $N (JIT): ")
@time compute_yn_jit(N)
profile && Profile.@profile compute_yn_jit(N)
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_jit($N)
end
using Printf
function print_allocs(allocs; minsize=64, raw=false)
allocs = sort(filter(a->a.size >= minsize, allocs.allocs); by=a->a.size)
if raw
for alloc in allocs
@printf "%10s [%s]\n" Base.format_bytes(alloc.size) alloc.type
end
else
per_type = Dict{Type,Int}()
for alloc in allocs
per_type[alloc.type] = get(per_type, alloc.type, 0) + alloc.size
end
for T_alloc in sort([per_type...]; by=x->x[2])
@printf "%10s [%s]\n" Base.format_bytes(T_alloc[2]) T_alloc[1]
end
end
end
if profile
const results = Profile.retrieve()
Profile.print(results...; mincount=500, maxdepth=20, C=true)
end
if alloc_profile
const allocs = Profile.Allocs.fetch()
print_allocs(allocs)
end
import dask
import numpy as np
import timeit
@dask.delayed
def fib0(n) :
return np.random.rand(10)
@dask.delayed
def wsum(a, b) :
return (.3*a + .7*b)
def compute_sn(N):
f0 = fib0(1)
f1 = fib0(2)
even = [f0]
for i in range(0, N) :
f2 = wsum(f0, f1)
f0, f1 = f1, f2
if (i%2 == 1) :
even.append(f2)
v = dask.delayed(sum)(even)
v.compute()
def compute_yn(N):
f0 = fib0(1)
f1 = fib0(2)
f2 = None
for i in range(0, N) :
f2 = wsum(f0, f1)
f0, f1 = f1, f2
f2.compute()
for N in [1000, 5000, 10000]:
print("s_n @ %d: %s" % (N, timeit.timeit(lambda: compute_sn(N), number=3)))
print("y_n @ %d: %s" % (N, timeit.timeit(lambda: compute_yn(N), number=3)))
@juliusrules
Copy link

juliusrules commented May 19, 2022

am getting this error running the #main branch of Dagger, from:

for smaller n it worked fine and is a lot faster!

@time compute_sn_jit(100000)

error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer")
jl_error at /buildworker/worker/package_linux64/build/src/rtutils.c:41
jl_switch at /buildworker/worker/package_linux64/build/src/task.c:501
try_yieldto at ./task.jl:700
wait at ./task.jl:770
wait at ./condition.jl:106
lock at ./lock.jl:100
lock at ./lock.jl:185
lock at ./weakkeydict.jl:87 [inlined]
delete! at ./weakkeydict.jl:166 [inlined]
finalize_ref at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:92
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
run_finalizer at /buildworker/worker/package_linux64/build/src/gc.c:278
jl_gc_run_finalizers_in_list at /buildworker/worker/package_linux64/build/src/gc.c:365
run_finalizers at /buildworker/worker/package_linux64/build/src/gc.c:394
jl_gc_collect at /buildworker/worker/package_linux64/build/src/gc.c:3259
maybe_collect at /buildworker/worker/package_linux64/build/src/gc.c:879 [inlined]
jl_gc_pool_alloc at /buildworker/worker/package_linux64/build/src/gc.c:1203
indexed_iterate at ./tuple.jl:86 [inlined]
indexed_iterate at ./tuple.jl:86
unknown function (ip: 0x7f41cb583308)
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
macro expansion at /home/julius/.julia/packages/MemPool/wlrUg/src/datastore.jl:100 [inlined]
#16 at ./task.jl:411
unknown function (ip: 0x7f41cb58318c)
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
start_task at /buildworker/worker/package_linux64/build/src/task.c:833
Error in enqueued work:
schedule: Task not runnable

@juliusrules
Copy link

juliusrules commented May 19, 2022

also i'm seeing another error at the #main branch, for the following line (I'm using Julia v 1.6.6)

@time compute_yn_jit(100000)

Error in enqueued work:
schedule: Task not runnableError in enqueued work:
AssertionError: poolunref_owner called before any poolref_owner
Stacktrace:
[1] (::MemPool.var"#26#28"{Int64, Dict{Int64, Base.Threads.Atomic{Int64}}})()
@ MemPool ~/.julia/packages/MemPool/wlrUg/src/datastore.jl:173
[2] macro expansion
@ ~/.julia/packages/MemPool/wlrUg/src/lock.jl:42 [inlined]
[3] with_datastore_lock(f::MemPool.var"#26#28"{Int64, Dict{Int64, Base.Threads.Atomic{Int64}}})
@ MemPool ~/.julia/packages/MemPool/wlrUg/src/datastore.jl:218
[4] poolunref_owner
@ ~/.julia/packages/MemPool/wlrUg/src/datastore.jl:172 [inlined]
[5] #137
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:354 [inlined]
[6] run_work_thunk(thunk::Distributed.var"#137#138"{typeof(MemPool.poolunref_owner), Tuple{Int64, Dict{Int64, Base.Threads.Atomic{Int64}}}, Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}}, print_error::Bool)
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:63
[7] run_work_thunk
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:72 [inlined]
[8] (::Distributed.var"#96#98"{Distributed.RemoteValue, Distributed.var"#137#138"{typeof(MemPool.poolunref_owner), Tuple{Int64, Dict{Int64, Base.Threads.Atomic{Int64}}}, Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}}})()
@ Distributed ./task.jl:411

@jpsamaroo
Copy link
Author

jpsamaroo commented May 22, 2022

Odd, can you tell me which versions of Julia and MemPool.jl you're using here?

@juliusrules
Copy link

juliusrules commented May 23, 2022

those errors where using julia 1.6.6 and MemPool v0.3.9, I also tried to use julia 1.7.2, and somehow it seems to deadlock for me for example the following code stuck and not return for me under julia v.1.7.2:

N = 10
f0 = Dagger.@spawn rand(10)
f1 = Dagger.@spawn rand(10)

@time begin
    even = [f0]
    for i in 1:N
        f2 = Dagger.@spawn fibsum(f0, f1)
        f0, f1 = f1, f2

        if i%2 == 0
            push!(even, f2)
        end
    end

    even = fetch.(even)
    fetch(Dagger.@spawn sum(even))
end

@jpsamaroo
Copy link
Author

jpsamaroo commented Jun 1, 2022

@juliusrules can you get me a stacktrace of where this is stuck? I usually send a SIGTERM to the process (instead of Ctrl+C) when I hit a deadlock to kill it and dump the stacktrace.

@juliusrules
Copy link

juliusrules commented Jun 4, 2022

now i got a different error with julia 1.7.3 + Dagger#master:

julius@julius:~$ /usr/local/julia-1.7.3/bin/julia
_
_ _ ()_ | Documentation: https://docs.julialang.org
() | () () |
_ _ | | __ _ | Type "?" for help, "]?" for Pkg help.
| | | | | | |/ ` | |
| | |
| | | | (
| | | Version 1.7.3 (2022-05-06)
/ |_'|||_'_| | Official https://julialang.org/ release
|__/ |

julia> using Dagger
[ Info: Precompiling Dagger [d58978e5-989f-55fb-8d15-ea34adc7bf54]

julia> N = 10
10

julia> f0 = Dagger.@Spawn rand(10)
Error in eager scheduler:
ThunkFailedException (Thunk[1](eager_thunk, Any[]) failure):
jl_set_task_tid == 0
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:33
[2] execute!(::Dagger.ThreadProc, ::Any)
@ Dagger ~/dev/Julius/Dagger/src/processor.jl:173
[3] #126
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:1058 [inlined]
[4] (::Dagger.var"#62#63"{Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}})()
@ Dagger ~/dev/Julius/Dagger/src/options.jl:7
[5] with_logstate(f::Function, logstate::Any)
@ Base.CoreLogging ./logging.jl:511
[6] with_logger(f::Function, logger::ContextVariablesX.ContextPayloadLogger)
@ Base.CoreLogging ./logging.jl:623
[7] with_task_ctxvars(f::Any, ctx::Any)
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/payloadlogger.jl:16
[8] with_context(f::Function, kvs::Pair{ContextVariablesX.ContextVar{NamedTuple}, NamedTuple{(), Tuple{}}})
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/ContextVariablesX.jl:336
[9] with_options(f::Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}, options::NamedTuple{(), Tuple{}})
@ Dagger ~/dev/Julius/Dagger/src/options.jl:6
[10] do_task(to_proc::Dagger.ThreadProc, comm::Vector{Any})
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:1056
[11] macro expansion
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:942 [inlined]
[12] (::Dagger.Sch.var"#116#118"{Dagger.ThreadProc, Distributed.RemoteChannel{Channel{Any}}, Vector{Any}})()
@ Dagger.Sch ./task.jl:429
Stacktrace:
[1] compute_dag(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:384
[2] compute(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger ~/dev/Julius/Dagger/src/compute.jl:31
[3] (::Dagger.Sch.var"#51#52"{Context})()
@ Dagger.Sch ./task.jl:429
^CERROR: InterruptException:
Stacktrace:
[1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
@ Base ./task.jl:812
[2] wait()
@ Base ./task.jl:872
[3] wait(c::Base.GenericCondition{ReentrantLock})
@ Base ./condition.jl:123
[4] fetch_buffered(c::Channel{Any})
@ Base ./channels.jl:365
[5] fetch(c::Channel{Any})
@ Base ./channels.jl:359
[6] fetch(r::Distributed.Future)
@ Distributed /usr/local/julia-1.7.3/share/julia/stdlib/v1.7/Distributed/src/remotecall.jl:620
[7] _spawn(f::Function, args::Int64; options::NamedTuple{(), Tuple{}}, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:255
[8] spawn(f::Function, args::Int64; processor::Nothing, scope::Nothing, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:275
[9] spawn(f::Function, args::Int64)
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:268
[10] top-level scope
@ ~/dev/Julius/Dagger/src/thunk.jl:334

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment