Created
April 21, 2024 08:42
-
-
Save Seelengrab/f0eddb28f139644ec71a9a1e8c4b42cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Base.Threads | |
import Random | |
using Logging | |
function main(n=1000; exec=:base) | |
for seed in 1:n | |
executor = if exec == :base | |
BaseExecutor() | |
elseif exec == :par | |
ConcurrentExecutor(seed) | |
else | |
throw(ArgumentError("`exec` can only be one of `:base` and `:par`!")) | |
end | |
print(seed, ": ") | |
if run_test(executor) | |
printstyled("PASSED\n"; color=:green) | |
else | |
printstyled("FAILED\n"; color=:red) | |
end | |
end | |
end | |
###### | |
## Executors | |
###### | |
abstract type Executor end | |
struct BaseExecutor <: Executor end | |
block_on(::BaseExecutor, t::Task) = fetch(t) | |
struct TestRuntime | |
rng::Random.Xoshiro | |
runnables::Vector{Threads.Event} | |
lock::ReentrantLock | |
event::Threads.Event | |
end | |
Base.lock(tr::TestRuntime) = lock(tr.lock) | |
Base.unlock(tr::TestRuntime) = unlock(tr.lock) | |
Base.lock(f, tr::TestRuntime) = lock(f, tr.lock) | |
Base.notify(tr::TestRuntime) = notify(tr.event) | |
Base.wait(tr::TestRuntime) = wait(tr.event) | |
struct ConcurrentExecutor <: Executor | |
runtime::TestRuntime | |
function ConcurrentExecutor(seed) | |
runtime = TestRuntime( | |
Random.Xoshiro(seed), | |
Task[], | |
ReentrantLock(), | |
Threads.Event(true) | |
) | |
new(runtime) | |
end | |
end | |
Base.@propagate_inbounds function swap_remove!(v::Vector, idx::Int) | |
@boundscheck checkbounds(v, idx) | |
@inbounds begin | |
element = v[idx] | |
replacement = v[end] | |
v[idx] = replacement | |
resize!(v, length(v)-1) | |
end | |
return element | |
end | |
function block_on(pe::ConcurrentExecutor, t::Task) | |
c = current_task() | |
c == t && throw(ConcurrencyViolationError("Can't block on the currently running task!")) | |
while true | |
if istaskfailed(t) | |
@info "Task failed!" Cur=c T=t | |
return nothing | |
elseif istaskdone(t) | |
@debug "No longer blocked!" Cur=c T=t | |
return Some(Base.task_result(t)) | |
else | |
rt = pe.runtime | |
@debug "Still blocked!" Cur=c T=t | |
event = @lock rt begin | |
@debug rt.runnables | |
isempty(rt.runnables) && throw(ConcurrencyViolationError("$c cannot make progress!")) | |
# runnableidx = Supposition.choice!(Supposition.CURRENT_TESTCASE[], eachindex(rt.runnables)) | |
runnableidx = rand(rt.rng, 1:length(rt.runnables)) | |
swap_remove!(rt.runnables, runnableidx) | |
end | |
notify(event) | |
# yield() | |
# wait(rt.event) | |
# why doesn't `yield` work just the same?! | |
sleep(0.001) | |
end | |
end | |
end | |
macro spawn(exec, thunk) | |
build_executor(exec, thunk) | |
end | |
function build_executor(exec, thunk) | |
run_event = gensym(:run_event) | |
syncvar = esc(Base.sync_varname) | |
quote | |
executor = $(esc(exec)) | |
if executor isa ConcurrentExecutor | |
$run_event = $Threads.Event(true) | |
t = @task begin | |
wait($run_event) | |
res = $(esc(thunk)) | |
res | |
end | |
lock(executor.runtime) do | |
push!(executor.runtime.runnables, $run_event) | |
@debug "Pushed to runtime!" T=t | |
end | |
t.sticky = false | |
schedule(t) | |
elseif executor isa BaseExecutor | |
$Threads.@spawn $(esc(thunk)) | |
else | |
throw(ArgumentError("Unsupported Executor!")) | |
end | |
end | |
end | |
function run_test(exe::Executor) | |
t = @spawn exe begin | |
@debug "Starting test" Cur=current_task() | |
l = ReentrantLock() | |
data = Ref(0) | |
future1 = @spawn exe begin | |
@debug "Future 1 scheduled" | |
@lock l begin | |
if data[] == 0 | |
data[] += 1 | |
end | |
end | |
@debug "Done with 1!" | |
end | |
future2 = @spawn exe begin | |
@debug "Future 2 scheduled" | |
@lock l begin | |
if data[] == 1 | |
data[] += 1 | |
end | |
end | |
@debug "Done with 2!" | |
end | |
wait(future1) | |
wait(future2) | |
@debug "Done with subtasks!" | |
@lock l begin | |
data[] | |
end | |
end | |
res = @something block_on(exe, t) Some(nothing) | |
res == 2 | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment