Skip to content

Instantly share code, notes, and snippets.

@Seelengrab
Created April 21, 2024 08:42
Show Gist options
  • Save Seelengrab/f0eddb28f139644ec71a9a1e8c4b42cd to your computer and use it in GitHub Desktop.
Save Seelengrab/f0eddb28f139644ec71a9a1e8c4b42cd to your computer and use it in GitHub Desktop.
import Base.Threads
import Random
using Logging
function main(n=1000; exec=:base)
for seed in 1:n
executor = if exec == :base
BaseExecutor()
elseif exec == :par
ConcurrentExecutor(seed)
else
throw(ArgumentError("`exec` can only be one of `:base` and `:par`!"))
end
print(seed, ": ")
if run_test(executor)
printstyled("PASSED\n"; color=:green)
else
printstyled("FAILED\n"; color=:red)
end
end
end
######
## Executors
######
abstract type Executor end
struct BaseExecutor <: Executor end
block_on(::BaseExecutor, t::Task) = fetch(t)
struct TestRuntime
rng::Random.Xoshiro
runnables::Vector{Threads.Event}
lock::ReentrantLock
event::Threads.Event
end
Base.lock(tr::TestRuntime) = lock(tr.lock)
Base.unlock(tr::TestRuntime) = unlock(tr.lock)
Base.lock(f, tr::TestRuntime) = lock(f, tr.lock)
Base.notify(tr::TestRuntime) = notify(tr.event)
Base.wait(tr::TestRuntime) = wait(tr.event)
struct ConcurrentExecutor <: Executor
runtime::TestRuntime
function ConcurrentExecutor(seed)
runtime = TestRuntime(
Random.Xoshiro(seed),
Task[],
ReentrantLock(),
Threads.Event(true)
)
new(runtime)
end
end
Base.@propagate_inbounds function swap_remove!(v::Vector, idx::Int)
@boundscheck checkbounds(v, idx)
@inbounds begin
element = v[idx]
replacement = v[end]
v[idx] = replacement
resize!(v, length(v)-1)
end
return element
end
function block_on(pe::ConcurrentExecutor, t::Task)
c = current_task()
c == t && throw(ConcurrencyViolationError("Can't block on the currently running task!"))
while true
if istaskfailed(t)
@info "Task failed!" Cur=c T=t
return nothing
elseif istaskdone(t)
@debug "No longer blocked!" Cur=c T=t
return Some(Base.task_result(t))
else
rt = pe.runtime
@debug "Still blocked!" Cur=c T=t
event = @lock rt begin
@debug rt.runnables
isempty(rt.runnables) && throw(ConcurrencyViolationError("$c cannot make progress!"))
# runnableidx = Supposition.choice!(Supposition.CURRENT_TESTCASE[], eachindex(rt.runnables))
runnableidx = rand(rt.rng, 1:length(rt.runnables))
swap_remove!(rt.runnables, runnableidx)
end
notify(event)
# yield()
# wait(rt.event)
# why doesn't `yield` work just the same?!
sleep(0.001)
end
end
end
macro spawn(exec, thunk)
build_executor(exec, thunk)
end
function build_executor(exec, thunk)
run_event = gensym(:run_event)
syncvar = esc(Base.sync_varname)
quote
executor = $(esc(exec))
if executor isa ConcurrentExecutor
$run_event = $Threads.Event(true)
t = @task begin
wait($run_event)
res = $(esc(thunk))
res
end
lock(executor.runtime) do
push!(executor.runtime.runnables, $run_event)
@debug "Pushed to runtime!" T=t
end
t.sticky = false
schedule(t)
elseif executor isa BaseExecutor
$Threads.@spawn $(esc(thunk))
else
throw(ArgumentError("Unsupported Executor!"))
end
end
end
function run_test(exe::Executor)
t = @spawn exe begin
@debug "Starting test" Cur=current_task()
l = ReentrantLock()
data = Ref(0)
future1 = @spawn exe begin
@debug "Future 1 scheduled"
@lock l begin
if data[] == 0
data[] += 1
end
end
@debug "Done with 1!"
end
future2 = @spawn exe begin
@debug "Future 2 scheduled"
@lock l begin
if data[] == 1
data[] += 1
end
end
@debug "Done with 2!"
end
wait(future1)
wait(future2)
@debug "Done with subtasks!"
@lock l begin
data[]
end
end
res = @something block_on(exe, t) Some(nothing)
res == 2
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment