Skip to content

Instantly share code, notes, and snippets.

@jpsamaroo
Created May 18, 2022 21:06
Show Gist options
  • Save jpsamaroo/95c78b3361ae454a51916183f2cf346f to your computer and use it in GitHub Desktop.
Save jpsamaroo/95c78b3361ae454a51916183f2cf346f to your computer and use it in GitHub Desktop.
Julius Graph Benchmarks: Dagger and Dask
using Dagger
using Profile
fibsum(a, b) = .3 .* a .+ .7 .* b
function compute_sn_aot(N)
f0 = delayed(rand)(10)
f1 = delayed(rand)(10)
s = 0. # result held here
even = [f0]
for i in 1:N
f2 = delayed(fibsum)(f0, f1)
f0, f1 = f1, f2
if i%2 == 0
push!(even, f2)
end
end
even = compute.(even)
collect(delayed(sum)(even))
end
function compute_sn_jit(N)
f0 = Dagger.@spawn rand(10)
f1 = Dagger.@spawn rand(10)
s = 0. # result held here
even = [f0]
for i in 1:N
f2 = Dagger.@spawn fibsum(f0, f1)
f0, f1 = f1, f2
if i%2 == 0
push!(even, f2)
end
end
even = fetch.(even)
fetch(Dagger.@spawn sum(even))
end
function compute_yn_aot(N)
f0 = delayed(rand)(10)
f1 = delayed(rand)(10)
f2 = 0. # results here
for i in 1:N
f2 = delayed(fibsum)(f0, f1)
f0, f1 = f1, f2
end
return collect(f2)
end
function compute_yn_jit(N)
f0 = Dagger.@spawn rand(10)
f1 = Dagger.@spawn rand(10)
f2 = 0. # results here
for i in 1:N
f2 = Dagger.@spawn fibsum(f0, f1)
f0, f1 = f1, f2
end
return fetch(f2)
end
# Warm-up the execution and GC code paths
compute_sn_aot(10)
compute_sn_jit(10)
compute_yn_aot(10)
compute_yn_jit(10)
GC.gc()
sleep(3)
# Change this to `true` to enable runtime and allocation profiling!
const profile = false
profile && Profile.init(; n=Profile.init()[1], delay=0.0001)
const alloc_profile = profile && VERSION >= v"1.8-"
if profile
Ns = [2_000]
else
Ns = [1_000, 5_000, 10_000, 100_000, 200_000, 500_000]
end
for N in Ns
print("s_n @ $N (AOT): ")
@time compute_sn_aot(N)
profile && Profile.@profile compute_sn_aot(N)
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_aot(N)
print("s_n @ $N (JIT): ")
@time compute_sn_jit(N)
profile && Profile.@profile compute_sn_jit(N)
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_jit($N)
print("y_n @ $N (AOT): ")
@time compute_yn_aot(N)
profile && Profile.@profile compute_yn_aot(N)
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_aot(N)
print("y_n @ $N (JIT): ")
@time compute_yn_jit(N)
profile && Profile.@profile compute_yn_jit(N)
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_jit($N)
end
using Printf
function print_allocs(allocs; minsize=64, raw=false)
allocs = sort(filter(a->a.size >= minsize, allocs.allocs); by=a->a.size)
if raw
for alloc in allocs
@printf "%10s [%s]\n" Base.format_bytes(alloc.size) alloc.type
end
else
per_type = Dict{Type,Int}()
for alloc in allocs
per_type[alloc.type] = get(per_type, alloc.type, 0) + alloc.size
end
for T_alloc in sort([per_type...]; by=x->x[2])
@printf "%10s [%s]\n" Base.format_bytes(T_alloc[2]) T_alloc[1]
end
end
end
if profile
const results = Profile.retrieve()
Profile.print(results...; mincount=500, maxdepth=20, C=true)
end
if alloc_profile
const allocs = Profile.Allocs.fetch()
print_allocs(allocs)
end
import dask
import numpy as np
import timeit
@dask.delayed
def fib0(n) :
return np.random.rand(10)
@dask.delayed
def wsum(a, b) :
return (.3*a + .7*b)
def compute_sn(N):
f0 = fib0(1)
f1 = fib0(2)
even = [f0]
for i in range(0, N) :
f2 = wsum(f0, f1)
f0, f1 = f1, f2
if (i%2 == 1) :
even.append(f2)
v = dask.delayed(sum)(even)
v.compute()
def compute_yn(N):
f0 = fib0(1)
f1 = fib0(2)
f2 = None
for i in range(0, N) :
f2 = wsum(f0, f1)
f0, f1 = f1, f2
f2.compute()
for N in [1000, 5000, 10000]:
print("s_n @ %d: %s" % (N, timeit.timeit(lambda: compute_sn(N), number=3)))
print("y_n @ %d: %s" % (N, timeit.timeit(lambda: compute_yn(N), number=3)))
@jpsamaroo
Copy link
Author

@juliusrules can you get me a stacktrace of where this is stuck? I usually send a SIGTERM to the process (instead of Ctrl+C) when I hit a deadlock to kill it and dump the stacktrace.

@juliusrules
Copy link

now i got a different error with julia 1.7.3 + Dagger#master:

julius@julius:~$ /usr/local/julia-1.7.3/bin/julia
_
_ _ ()_ | Documentation: https://docs.julialang.org
() | () () |
_ _ | | __ _ | Type "?" for help, "]?" for Pkg help.
| | | | | | |/ ` | |
| | |
| | | | (
| | | Version 1.7.3 (2022-05-06)
/ |_'|||_'_| | Official https://julialang.org/ release
|__/ |

julia> using Dagger
[ Info: Precompiling Dagger [d58978e5-989f-55fb-8d15-ea34adc7bf54]

julia> N = 10
10

julia> f0 = Dagger.@Spawn rand(10)
Error in eager scheduler:
ThunkFailedException (Thunk[1](eager_thunk, Any[]) failure):
jl_set_task_tid == 0
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:33
[2] execute!(::Dagger.ThreadProc, ::Any)
@ Dagger ~/dev/Julius/Dagger/src/processor.jl:173
[3] #126
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:1058 [inlined]
[4] (::Dagger.var"#62#63"{Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}})()
@ Dagger ~/dev/Julius/Dagger/src/options.jl:7
[5] with_logstate(f::Function, logstate::Any)
@ Base.CoreLogging ./logging.jl:511
[6] with_logger(f::Function, logger::ContextVariablesX.ContextPayloadLogger)
@ Base.CoreLogging ./logging.jl:623
[7] with_task_ctxvars(f::Any, ctx::Any)
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/payloadlogger.jl:16
[8] with_context(f::Function, kvs::Pair{ContextVariablesX.ContextVar{NamedTuple}, NamedTuple{(), Tuple{}}})
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/ContextVariablesX.jl:336
[9] with_options(f::Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}, options::NamedTuple{(), Tuple{}})
@ Dagger ~/dev/Julius/Dagger/src/options.jl:6
[10] do_task(to_proc::Dagger.ThreadProc, comm::Vector{Any})
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:1056
[11] macro expansion
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:942 [inlined]
[12] (::Dagger.Sch.var"#116#118"{Dagger.ThreadProc, Distributed.RemoteChannel{Channel{Any}}, Vector{Any}})()
@ Dagger.Sch ./task.jl:429
Stacktrace:
[1] compute_dag(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:384
[2] compute(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger ~/dev/Julius/Dagger/src/compute.jl:31
[3] (::Dagger.Sch.var"#51#52"{Context})()
@ Dagger.Sch ./task.jl:429
^CERROR: InterruptException:
Stacktrace:
[1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
@ Base ./task.jl:812
[2] wait()
@ Base ./task.jl:872
[3] wait(c::Base.GenericCondition{ReentrantLock})
@ Base ./condition.jl:123
[4] fetch_buffered(c::Channel{Any})
@ Base ./channels.jl:365
[5] fetch(c::Channel{Any})
@ Base ./channels.jl:359
[6] fetch(r::Distributed.Future)
@ Distributed /usr/local/julia-1.7.3/share/julia/stdlib/v1.7/Distributed/src/remotecall.jl:620
[7] _spawn(f::Function, args::Int64; options::NamedTuple{(), Tuple{}}, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:255
[8] spawn(f::Function, args::Int64; processor::Nothing, scope::Nothing, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:275
[9] spawn(f::Function, args::Int64)
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:268
[10] top-level scope
@ ~/dev/Julius/Dagger/src/thunk.jl:334

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment