Created
May 18, 2022 21:06
-
-
Save jpsamaroo/95c78b3361ae454a51916183f2cf346f to your computer and use it in GitHub Desktop.
Julius Graph Benchmarks: Dagger and Dask
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Dagger | |
using Profile | |
fibsum(a, b) = .3 .* a .+ .7 .* b | |
function compute_sn_aot(N) | |
f0 = delayed(rand)(10) | |
f1 = delayed(rand)(10) | |
s = 0. # result held here | |
even = [f0] | |
for i in 1:N | |
f2 = delayed(fibsum)(f0, f1) | |
f0, f1 = f1, f2 | |
if i%2 == 0 | |
push!(even, f2) | |
end | |
end | |
even = compute.(even) | |
collect(delayed(sum)(even)) | |
end | |
function compute_sn_jit(N) | |
f0 = Dagger.@spawn rand(10) | |
f1 = Dagger.@spawn rand(10) | |
s = 0. # result held here | |
even = [f0] | |
for i in 1:N | |
f2 = Dagger.@spawn fibsum(f0, f1) | |
f0, f1 = f1, f2 | |
if i%2 == 0 | |
push!(even, f2) | |
end | |
end | |
even = fetch.(even) | |
fetch(Dagger.@spawn sum(even)) | |
end | |
function compute_yn_aot(N) | |
f0 = delayed(rand)(10) | |
f1 = delayed(rand)(10) | |
f2 = 0. # results here | |
for i in 1:N | |
f2 = delayed(fibsum)(f0, f1) | |
f0, f1 = f1, f2 | |
end | |
return collect(f2) | |
end | |
function compute_yn_jit(N) | |
f0 = Dagger.@spawn rand(10) | |
f1 = Dagger.@spawn rand(10) | |
f2 = 0. # results here | |
for i in 1:N | |
f2 = Dagger.@spawn fibsum(f0, f1) | |
f0, f1 = f1, f2 | |
end | |
return fetch(f2) | |
end | |
# Warm-up the execution and GC code paths | |
compute_sn_aot(10) | |
compute_sn_jit(10) | |
compute_yn_aot(10) | |
compute_yn_jit(10) | |
GC.gc() | |
sleep(3) | |
# Change this to `true` to enable runtime and allocation profiling! | |
const profile = false | |
profile && Profile.init(; n=Profile.init()[1], delay=0.0001) | |
const alloc_profile = profile && VERSION >= v"1.8-" | |
if profile | |
Ns = [2_000] | |
else | |
Ns = [1_000, 5_000, 10_000, 100_000, 200_000, 500_000] | |
end | |
for N in Ns | |
print("s_n @ $N (AOT): ") | |
@time compute_sn_aot(N) | |
profile && Profile.@profile compute_sn_aot(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_aot(N) | |
print("s_n @ $N (JIT): ") | |
@time compute_sn_jit(N) | |
profile && Profile.@profile compute_sn_jit(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_sn_jit($N) | |
print("y_n @ $N (AOT): ") | |
@time compute_yn_aot(N) | |
profile && Profile.@profile compute_yn_aot(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_aot(N) | |
print("y_n @ $N (JIT): ") | |
@time compute_yn_jit(N) | |
profile && Profile.@profile compute_yn_jit(N) | |
alloc_profile && @eval Profile.Allocs.@profile sample_rate=1.0 compute_yn_jit($N) | |
end | |
using Printf | |
function print_allocs(allocs; minsize=64, raw=false) | |
allocs = sort(filter(a->a.size >= minsize, allocs.allocs); by=a->a.size) | |
if raw | |
for alloc in allocs | |
@printf "%10s [%s]\n" Base.format_bytes(alloc.size) alloc.type | |
end | |
else | |
per_type = Dict{Type,Int}() | |
for alloc in allocs | |
per_type[alloc.type] = get(per_type, alloc.type, 0) + alloc.size | |
end | |
for T_alloc in sort([per_type...]; by=x->x[2]) | |
@printf "%10s [%s]\n" Base.format_bytes(T_alloc[2]) T_alloc[1] | |
end | |
end | |
end | |
if profile | |
const results = Profile.retrieve() | |
Profile.print(results...; mincount=500, maxdepth=20, C=true) | |
end | |
if alloc_profile | |
const allocs = Profile.Allocs.fetch() | |
print_allocs(allocs) | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dask | |
import numpy as np | |
import timeit | |
@dask.delayed | |
def fib0(n) : | |
return np.random.rand(10) | |
@dask.delayed | |
def wsum(a, b) : | |
return (.3*a + .7*b) | |
def compute_sn(N): | |
f0 = fib0(1) | |
f1 = fib0(2) | |
even = [f0] | |
for i in range(0, N) : | |
f2 = wsum(f0, f1) | |
f0, f1 = f1, f2 | |
if (i%2 == 1) : | |
even.append(f2) | |
v = dask.delayed(sum)(even) | |
v.compute() | |
def compute_yn(N): | |
f0 = fib0(1) | |
f1 = fib0(2) | |
f2 = None | |
for i in range(0, N) : | |
f2 = wsum(f0, f1) | |
f0, f1 = f1, f2 | |
f2.compute() | |
for N in [1000, 5000, 10000]: | |
print("s_n @ %d: %s" % (N, timeit.timeit(lambda: compute_sn(N), number=3))) | |
print("y_n @ %d: %s" % (N, timeit.timeit(lambda: compute_yn(N), number=3))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
now i got a different error with julia 1.7.3 + Dagger#master:
julius@julius:~$ /usr/local/julia-1.7.3/bin/julia
_
_ _ ()_ | Documentation: https://docs.julialang.org
() | () () |
_ _ | | __ _ | Type "?" for help, "]?" for Pkg help.
| | | | | | |/ ` | |
| | || | | | (| | | Version 1.7.3 (2022-05-06)
/ |_'|||_'_| | Official https://julialang.org/ release
|__/ |
julia> using Dagger
[ Info: Precompiling Dagger [d58978e5-989f-55fb-8d15-ea34adc7bf54]
julia> N = 10
10
julia> f0 = Dagger.@Spawn rand(10)
Error in eager scheduler:
ThunkFailedException (Thunk[1](eager_thunk, Any[]) failure):
jl_set_task_tid == 0
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:33
[2] execute!(::Dagger.ThreadProc, ::Any)
@ Dagger ~/dev/Julius/Dagger/src/processor.jl:173
[3] #126
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:1058 [inlined]
[4] (::Dagger.var"#62#63"{Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}})()
@ Dagger ~/dev/Julius/Dagger/src/options.jl:7
[5] with_logstate(f::Function, logstate::Any)
@ Base.CoreLogging ./logging.jl:511
[6] with_logger(f::Function, logger::ContextVariablesX.ContextPayloadLogger)
@ Base.CoreLogging ./logging.jl:623
[7] with_task_ctxvars(f::Any, ctx::Any)
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/payloadlogger.jl:16
[8] with_context(f::Function, kvs::Pair{ContextVariablesX.ContextVar{NamedTuple}, NamedTuple{(), Tuple{}}})
@ ContextVariablesX ~/.julia/packages/ContextVariablesX/wzJaf/src/ContextVariablesX.jl:336
[9] with_options(f::Dagger.Sch.var"#126#135"{Dagger.ThreadProc, Vector{Any}}, options::NamedTuple{(), Tuple{}})
@ Dagger ~/dev/Julius/Dagger/src/options.jl:6
[10] do_task(to_proc::Dagger.ThreadProc, comm::Vector{Any})
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:1056
[11] macro expansion
@ ~/dev/Julius/Dagger/src/sch/Sch.jl:942 [inlined]
[12] (::Dagger.Sch.var"#116#118"{Dagger.ThreadProc, Distributed.RemoteChannel{Channel{Any}}, Vector{Any}})()
@ Dagger.Sch ./task.jl:429
Stacktrace:
[1] compute_dag(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger.Sch ~/dev/Julius/Dagger/src/sch/Sch.jl:384
[2] compute(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
@ Dagger ~/dev/Julius/Dagger/src/compute.jl:31
[3] (::Dagger.Sch.var"#51#52"{Context})()
@ Dagger.Sch ./task.jl:429
^CERROR: InterruptException:
Stacktrace:
[1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
@ Base ./task.jl:812
[2] wait()
@ Base ./task.jl:872
[3] wait(c::Base.GenericCondition{ReentrantLock})
@ Base ./condition.jl:123
[4] fetch_buffered(c::Channel{Any})
@ Base ./channels.jl:365
[5] fetch(c::Channel{Any})
@ Base ./channels.jl:359
[6] fetch(r::Distributed.Future)
@ Distributed /usr/local/julia-1.7.3/share/julia/stdlib/v1.7/Distributed/src/remotecall.jl:620
[7] _spawn(f::Function, args::Int64; options::NamedTuple{(), Tuple{}}, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:255
[8] spawn(f::Function, args::Int64; processor::Nothing, scope::Nothing, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:275
[9] spawn(f::Function, args::Int64)
@ Dagger ~/dev/Julius/Dagger/src/thunk.jl:268
[10] top-level scope
@ ~/dev/Julius/Dagger/src/thunk.jl:334