Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Last active October 19, 2021 01:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pchiusano/f5d899a59b6a1c3de1f7518f7ad89f2b to your computer and use it in GitHub Desktop.
Save pchiusano/f5d899a59b6a1c3de1f7518f7ad89f2b to your computer and use it in GitHub Desktop.
Cooperative threading
> Async.pure.run 'let
List.parMap (x -> x*10) [1,2,3,4,5]
> Async.pure.run 'let
use Async put fork await empty!
tr = empty!
t1 = fork '(await tr + 1)
t2 = fork '(put 9 tr)
Async.await t1
> List.map (x -> x * 100) [1,2,3,4,5]
Async.put : a -> t a -> ()
Async.put a t = Async.complete (Right a) t
use .base.M2h Scope
unique type PureMVar s g a
= PureMVar (Ref {Scope s} (Optional a))
(Ref {Scope s} [Optional a ->{Scope s, g} ()])
unique ability Concurrent t g where
empty! : t a
full : a -> t a
put : a -> t a -> ()
tryPut : a -> t a -> Boolean
take : t a -> a
tryTake : t a -> Optional a
read : t a -> a
tryRead : t a -> Optional a
isEmpty : t a -> Boolean
fork : '{g, Concurrent t g} () -> ()
forkInterruptible : t x -> '{g, Concurrent t g} () -> ()
sleepMicroseconds : Nat -> ()
unique type Concurrent.T t a = T Boolean (t (Either Failure a))
Async.embed : '{g, Async t g} a -> '{Async t g} a
Async.embed a _ = Async.await (Async.fork a)
Async.pure.run : (forall t . '{Async t g, g} a) ->{g} Either Failure a
Async.pure.run a =
Concurrent.run.pure '(catch <| Async.toConcurrent (Async.embed a))
Concurrent.tryRun : (forall t . '{IO,Exception,Concurrent t {IO,Exception}} a) ->{IO} Either Failure a
Concurrent.tryRun a =
catch '(run.impl (Concurrent.embed a))
Concurrent.embed : '{g, Concurrent t g} a -> '{Concurrent t g} a
Concurrent.embed c _ =
r = Concurrent.empty!
Concurrent.fork 'let
a = !c
Concurrent.put a r
Concurrent.take r
Concurrent.run.pure : (forall t . '{Concurrent t g, g} a) ->{g} a
Concurrent.run.pure t = Scope.run '(run.pure.impl (Concurrent.embed t))
Concurrent.run.pure.impl : (forall t . '{Concurrent t g} a) ->{Scope s, g} a
Concurrent.run.pure.impl prog =
q : Ref {Scope s} ['{Scope s, g} ()]
q = Scope.ref []
enq : '{Scope s, g} () ->{Scope s} ()
enq t =
Ref.write q (Ref.read q :+ t)
isFull : PureMVar s g x ->{Scope s} Boolean
isFull = cases PureMVar r _ -> match Ref.read r with
Some _ -> true
_ -> false
set : Optional x -> PureMVar s g x ->{Scope s} ()
set o = cases PureMVar r cbs -> match Ref.read cbs with
listening ->
ws = List.map (w -> '(w o)) listening
Ref.write q (Ref.read q ++ ws)
Ref.write r o
Ref.write cbs []
go : '{Scope s} Boolean -> (x ->{Scope s} ())
-> Request {Concurrent (PureMVar s g) g} x ->{Scope s, g} ()
go isDone putResult req39 =
if !isDone then ()
else match req39 with
{ a } -> putResult a
{ Concurrent.fork a -> k } ->
enq '(handle !a with go isDone (const ()))
handle !k with go isDone putResult
{ Concurrent.forkInterruptible cancel a -> k } ->
isDone' _ = !isDone || isFull cancel
enq '(handle !a with go isDone' (const ()))
handle !k with go isDone putResult
{ Concurrent.full a -> k } ->
r = PureMVar (Scope.ref (Some a)) (Scope.ref [])
handle k r with go isDone putResult
{ Concurrent.empty! -> k } ->
r = PureMVar (Scope.ref None) (Scope.ref [])
handle k r with go isDone putResult
{ Concurrent.isEmpty mv -> k} ->
r = not (isFull mv)
handle k r with go isDone putResult
{ Concurrent.put a v@(PureMVar r waiting) -> k } ->
match Ref.read r with
None -> match Ref.read waiting with
[] ->
Ref.write r (Some a)
handle !k with go isDone putResult
_ ->
enq '(handle !k with go isDone putResult)
set (Some a) v
Some a0 ->
w = Ref.read waiting
Ref.write waiting (w :+ '(handle !k with go isDone putResult))
{ Concurrent.tryPut a v@(PureMVar r _) -> k } -> match Ref.read r with
None ->
set (Some a) v
handle k true with go isDone putResult
Some _ ->
handle k false with go isDone putResult
{ Concurrent.take v@(PureMVar r cbs) -> k } ->
await : forall x2 . PureMVar s g x2
-> (x2 ->{Concurrent (PureMVar s g) g} x)
-> Optional x2 ->{Scope s} ()
await _ k = cases
None -> !step
Some o ->
enq '(handle k o with go isDone putResult)
set None v
step _ = match Ref.read r with
None ->
Ref.write cbs (Ref.read cbs :+ await v k)
Some a0 ->
Ref.write cbs (Ref.read cbs :+ await v k)
set (Some a0) v
!step
{ Concurrent.tryTake v@(PureMVar r cbs) -> k } ->
match Ref.read r with
None -> handle k None with go isDone putResult
Some a ->
set None v
handle k (Some a) with go isDone putResult
{ Concurrent.read v@(PureMVar r cbs) -> k } ->
await : forall x2 . PureMVar s g x2
-> (x2 ->{Concurrent (PureMVar s g) g} x)
-> Optional x2 ->{Scope s} ()
await _ k = cases
None -> !doIt -- go isDone putResult req
Some o -> enq '(handle k o with go isDone putResult)
doIt _ = match Ref.read r with
None -> Ref.write cbs (Ref.read cbs :+ await v k)
Some a0 ->
Ref.write cbs (Ref.read cbs :+ await v k)
set (Some a0) v
!doIt
{ Concurrent.tryRead (PureMVar r _) -> k } ->
a = Ref.read r
handle k a with go isDone putResult
finalResult = Scope.ref None
isDone _ = isSome (Ref.read finalResult)
drainQueue _ =
if !isDone then ()
else match Ref.read q with
[] -> ()
(h +: t) ->
Ref.write q t
!h
!drainQueue
handle !prog with go isDone (a -> Ref.write finalResult (Some a))
!drainQueue
match Ref.read finalResult with
None -> bug "deadlock detected"
Some a -> a
Concurrent.run : (forall t . '{Concurrent t {IO,Exception}, IO, Exception} a) ->{IO, Exception} a
Concurrent.run a = run.impl (Concurrent.embed a)
Concurrent.run.impl : (forall t . '{Concurrent t {IO,Exception}} a) ->{IO, Exception} a
Concurrent.run.impl a =
forkIO : '{IO,Exception} x ->{IO} ThreadId
forkIO e = io.fork '(ignore (catch e))
go : Request {Concurrent MVar {IO,Exception}} x ->{IO, Exception} x
go = cases
{ a } -> a
{ Concurrent.fork a -> k } ->
tid = forkIO '(handle !a with go)
handle !k with go
{ Concurrent.forkInterruptible signal a -> k } ->
tid = forkIO '(handle !a with go)
watcher = forkIO 'let
done = MVar.read signal
io.kill tid
handle !k with go
{ Concurrent.full a -> k } ->
r = MVar.new a
handle k r with go
{ Concurrent.empty! -> k } ->
r = !MVar.newEmpty
handle k r with go
{ Concurrent.isEmpty mv -> k} ->
r = MVar.isEmpty mv
handle k r with go
{ Concurrent.put a mv -> k } ->
r = MVar.put mv a
handle k r with go
{ Concurrent.tryPut a mv -> k } ->
r = MVar.tryPut mv a
handle k r with go
{ Concurrent.take mv -> k } ->
r = MVar.take mv
handle k r with go
{ Concurrent.tryTake mv -> k } ->
r = MVar.tryTake mv
handle k r with go
{ Concurrent.read mv -> k } ->
r = MVar.read mv
handle k r with go
{ Concurrent.tryRead mv -> k } ->
r = MVar.tryRead mv
handle k r with go
handle !a with go
Async.toConcurrent : (forall t . '{Async t g} a) -> '{Concurrent t g, Exception} a
Async.toConcurrent a _ =
go : t (Either Failure ()) -> Request {Async (T t) g} x ->{Concurrent t g, Exception} x
go parent = cases
{ a } ->
Concurrent.put (Right ()) parent
a
{Async.fail e -> k} ->
Concurrent.put (Right ()) parent
raise e
{Async.tryDetach a -> k} ->
result = Concurrent.empty!
Concurrent.forkInterruptible result 'let
e = catch '(handle !a with go Concurrent.empty!)
Concurrent.put e result
handle k (Right (T false result)) with go parent
{Async.tryComplete e (T readOnly t) -> k} ->
if readOnly then handle k (Right ()) with go parent
else
r = if Concurrent.tryPut e t then Right ()
else Left alreadyCompleted
handle k r with go parent
{Async.tryAwait (T _ t) -> k} -> handle k (Concurrent.read t) with go parent
{Async.current -> k} -> handle k (T true parent) with go parent
{Async.tryIsComplete (T _ t) -> k} -> handle k (Right (not (Concurrent.isEmpty t))) with go parent
{Async.empty! -> k} -> handle k (T false Concurrent.empty!) with go parent
handle !a with go Concurrent.empty!
-- idea: could you write a handler of `Concurrent` that checks a `t x` in
-- between every operation? Yes, seems like it.
---
{{ ``PureTask await complete isComplete`` }}
unique type PureTask s g a
= PureTask ('{Cooperative s g, Exception} a)
(Either Failure a ->{Cooperative s g, g} ())
('{Cooperative s g} Boolean)
pure.run.impl : (∀ t. '{Async t g} a) ->{Cooperative s g, Exception, g} a
pure.run.impl async =
-- todo - think about handling of parent task
go : Request {Async (PureTask s g) g} x ->{Cooperative s g, Exception, g} x
go = cases
{ x } -> x
{ Async.fail e -> k } -> raise e
{ tryDetach a -> k } ->
result = Cooperative.fork '(handle !a with go)
-- suspend!
handle k (Right result) with go
handle !async with go
{-
up.async.Async.run : '{Async Async.Task {IO}} a ->{IO, Exception} a
up.async.Async.run a =
use io.MVar newEmpty
go : MVar (Either Failure ()) -> Request {Async Async.Task {IO}} x ->{IO, Exception} x
go parent = cases
{ a } ->
io.MVar.tryPut parent !Right
a
{Async.fail e -> k} ->
io.MVar.tryPut parent !Right
raise e
{tryDetach a -> k} ->
result = !newEmpty
tid =
io.fork
'let
e = catch '(handle !a with go !newEmpty)
unsafeRun! '(io.MVar.tryPut result e)
result' = fromMVar result |> onComplete '(io.kill tid)
handle k (Right result') with go parent
{Async.tryComplete e t -> k} -> handle k (catch '(Task.complete e t)) with go parent
{Async.tryAwait t -> k} -> handle k (catch '(Task.await t)) with go parent
{Async.tryParent! -> k} -> handle k (Right (fromMVarReadOnly parent)) with go parent
{Async.tryIsComplete t -> k} -> handle k (catch '(Task.isComplete t)) with go parent
{Async.empty! -> k} -> handle k !Task.empty with go parent
handle !a with go !newEmpty
-}
Cooperative.await : PureMVar s g (Either Failure a) ->{Cooperative s g, Exception} a
Cooperative.await mv = Exception.reraise (Cooperative.peek mv)
Cooperative.run : '{Cooperative s g} a ->{Scope s, g} a
Cooperative.run prog =
q : Ref {Scope s} ['{Scope s, g} ()]
q = Scope.ref []
go : '{Scope s} Boolean
-> (x ->{Scope s} ())
-> Request {Cooperative s g} x
-> {Scope s, g} ()
go isDone putResult req =
if !isDone then ()
else match req with
{ x } -> putResult x
{ suspend! -> k } -> Ref.write q (Ref.read q :+ (_ -> handle !k with go isDone putResult))
{ fork a -> k } ->
result = !PureMVar.newEmpty
t = PureTask '(Cooperative.await result) (e -> put e result) '(PureMVar.isEmpty result)
handle k t with go isDone putResult
{ put a mv -> k } -> match impl.get mv with
Empty [] ->
impl.set (Full a []) mv
handle !k with go isDone putResult
Empty ((peek,cb) +: waiting) ->
cb a
impl.set (Empty waiting)
match peek with
Peek -> go isDone putResult req
Take -> ()
Full a0 w ->
impl.set (Full a (w :+ (a, _ -> handle !k with go isDone putResult))) mv
finalResult = Scope.ref None
isDone _ = isSome (Ref.read finalResult)
drainQueue _ =
if !isDone then ()
else match Ref.read q with
[] -> ()
(h +: t) -> !h
Ref.write q t
!drainQueue
handle !prog with go isDone (a -> Ref.write finalResult (Some a))
!drainQueue
match Ref.read finalResult with
None -> bug "deadlock detected"
Some a -> a
> Async.pure.run 'let
List.parMap (x -> x*10) [1,2,3,4,5]
> Async.pure.run 'let
use Async put fork await empty!
tr = empty!
t1 = fork '(await tr + 1)
t2 = fork '(put 9 tr)
Async.await t1
> List.map (x -> x * 100) [1,2,3,4,5]
use .base.M2h Scope
unique type PureMVar s g a
= PureMVar (Ref {Scope s} (Optional a, [Optional a ->{Scope s, g} ()]))
unique ability Concurrent t g where
empty! : t a
full : a -> t a
put : a -> t a -> ()
tryPut : a -> t a -> Boolean
take : t a -> a
tryTake : t a -> Optional a
read : t a -> a
tryRead : t a -> Optional a
isEmpty : t a -> Boolean
fork : '{g, Concurrent t g} () -> ()
forkInterruptible : t x -> '{g, Concurrent t g} () -> ()
--
unique type Concurrent.T t a = T Boolean (t (Either Failure a))
Async.pure.run : (forall t . '{Async t g, g} a) ->{g} Either Failure a
Async.pure.run a =
Concurrent.run.pure '(catch <| Async.toConcurrent (Async.embed a))
Concurrent.tryRun : (forall t . '{IO,Exception,Concurrent t {IO,Exception}} a) ->{IO} Either Failure a
Concurrent.tryRun a =
catch '(run.impl (Concurrent.embed a))
Concurrent.embed : '{g, Concurrent t g} a -> '{Concurrent t g} a
Concurrent.embed c _ =
r = Concurrent.empty!
Concurrent.fork 'let
a = !c
Concurrent.put a r
Concurrent.take r
Concurrent.run.pure : (forall t . '{Concurrent t g, g} a) ->{g} a
Concurrent.run.pure t = Scope.run '(run.pure.impl (embed t))
Concurrent.run.pure.impl : (forall t . '{Concurrent t g} a) ->{Scope s, g} a
Concurrent.run.pure.impl prog =
q : Ref {Scope s} ['{Scope s, g} ()]
q = Scope.ref []
enq : '{Scope s, g} () ->{Scope s} ()
enq t =
Ref.write q (Ref.read q :+ t)
isFull : PureMVar s g x ->{Scope s} Boolean
isFull = cases PureMVar r -> match Ref.read r with
(Some _, _) -> true
_ -> false
set : Optional x -> PureMVar s g x ->{Scope s} ()
set o = cases PureMVar r -> match Ref.read r with
(_, listening) ->
ws = (List.map (w -> '(w o)) listening)
Ref.write q (Ref.read q ++ ws)
Ref.write r (o, [])
go : '{Scope s} Boolean -> (x ->{Scope s} ())
-> Request {Concurrent (PureMVar s g) g} x ->{Scope s, g} ()
go isDone putResult req39 =
if !isDone then ()
else match req39 with
{ a } -> putResult a
{ Concurrent.fork a -> k } ->
enq '(handle !a with go isDone (const ()))
handle !k with go isDone putResult
{ Concurrent.forkInterruptible cancel a -> k } ->
isDone' _ = !isDone || isFull cancel
enq '(handle !a with go isDone' (const ()))
handle !k with go isDone putResult
{ Concurrent.full a -> k } ->
r = PureMVar (Scope.ref (Some a, []))
handle k r with go isDone putResult
{ Concurrent.empty! -> k } ->
r = PureMVar (Scope.ref (None, []))
handle k r with go isDone putResult
{ Concurrent.isEmpty mv -> k} ->
r = not (isFull mv)
handle k r with go isDone putResult
{ Concurrent.put a v@(PureMVar mv) -> k } ->
match Ref.read mv with
(None, []) ->
Ref.write mv (Some a, [])
handle !k with go isDone putResult
(None, _) ->
enq '(handle !k with go isDone putResult)
set (Some a) v
(Some a0, w) ->
Ref.write mv (Some a0, w :+ '(handle !k with go isDone putResult))
{ Concurrent.tryPut a v@(PureMVar mv) -> k } -> match Ref.read mv with
(None, ls) ->
set (Some a) v
handle k true with go isDone putResult
(Some _, ls) ->
handle k false with go isDone putResult
{ Concurrent.take v@(PureMVar mv) -> k } ->
await : forall x2 . PureMVar s g x2
-> (x2 ->{Concurrent (PureMVar s g) g} x)
-> Optional x2 ->{Scope s} ()
await _ k = cases
None -> !doIt -- go isDone putResult req
Some o -> enq '(handle k o with go isDone putResult)
doIt _ = match Ref.read mv with
(None, ws) ->
Ref.write mv (None, ws :+ await v k)
(Some a0, ws) ->
Ref.write mv (Some a0, ws :+ await v k)
set None v
!doIt
{ Concurrent.tryTake v@(PureMVar mv) -> k } ->
match Ref.read mv with
(None, ws) -> handle k None with go isDone putResult
(Some a, ws) ->
set None v
handle k (Some a) with go isDone putResult
{ Concurrent.read v@(PureMVar mv) -> k } ->
await : forall x2 . PureMVar s g x2
-> (x2 ->{Concurrent (PureMVar s g) g} x)
-> Optional x2 ->{Scope s} ()
await _ k = cases
None -> !doIt -- go isDone putResult req
Some o -> enq '(handle k o with go isDone putResult)
doIt _ = match Ref.read mv with
(None, ws) -> Ref.write mv (None, ws :+ await v k)
(Some a0, ws) ->
Ref.write mv (Some a0, ws :+ await v k)
set (Some a0) v
!doIt
{ Concurrent.tryRead v@(PureMVar mv) -> k } ->
match Ref.read mv with (a, ws) -> handle k a with go isDone putResult
finalResult = Scope.ref None
isDone _ = isSome (Ref.read finalResult)
drainQueue _ =
if !isDone then ()
else match Ref.read q with
[] -> ()
(h +: t) ->
Ref.write q t
!h
!drainQueue
handle !prog with go isDone (a -> Ref.write finalResult (Some a))
!drainQueue
match Ref.read finalResult with
None -> bug "deadlock detected"
Some a -> a
Concurrent.run : (forall t . '{Concurrent t {IO,Exception}, IO, Exception} a) ->{IO, Exception} a
Concurrent.run a = run.impl (Concurrent.embed a)
Concurrent.run.impl : (forall t . '{Concurrent t {IO,Exception}} a) ->{IO, Exception} a
Concurrent.run.impl a =
forkIO : '{IO,Exception} x ->{IO} ThreadId
forkIO e = io.fork '(ignore (catch e))
go : Request {Concurrent MVar {IO,Exception}} x ->{IO, Exception} x
go = cases
{ a } -> a
{ Concurrent.fork a -> k } ->
tid = forkIO '(handle !a with go)
handle !k with go
{ Concurrent.forkInterruptible signal a -> k } ->
tid = forkIO '(handle !a with go)
watcher = forkIO 'let
done = MVar.read signal
io.kill tid
handle !k with go
{ Concurrent.full a -> k } ->
r = MVar.new a
handle k r with go
{ Concurrent.empty! -> k } ->
r = !MVar.newEmpty
handle k r with go
{ Concurrent.isEmpty mv -> k} ->
r = MVar.isEmpty mv
handle k r with go
{ Concurrent.put a mv -> k } ->
r = MVar.put mv a
handle k r with go
{ Concurrent.tryPut a mv -> k } ->
r = MVar.tryPut mv a
handle k r with go
{ Concurrent.take mv -> k } ->
r = MVar.take mv
handle k r with go
{ Concurrent.tryTake mv -> k } ->
r = MVar.tryTake mv
handle k r with go
{ Concurrent.read mv -> k } ->
r = MVar.read mv
handle k r with go
{ Concurrent.tryRead mv -> k } ->
r = MVar.tryRead mv
handle k r with go
handle !a with go
Async.toConcurrent : (forall t . '{Async t g} a) -> '{Concurrent t g, Exception} a
Async.toConcurrent a _ =
go : t (Either Failure ()) -> Request {Async (T t) g} x ->{Concurrent t g, Exception} x
go parent = cases
{ a } ->
Concurrent.put (Right ()) parent
a
{Async.fail e -> k} ->
Concurrent.put (Right ()) parent
raise e
{Async.tryDetach a -> k} ->
result = Concurrent.empty!
Concurrent.forkInterruptible result 'let
e = catch '(handle !a with go Concurrent.empty!)
Concurrent.put e result
handle k (Right (T false result)) with go parent
{Async.tryComplete e (T readOnly t) -> k} ->
if readOnly then handle k (Right ()) with go parent
else
r = if Concurrent.tryPut e t then Right ()
else Left alreadyCompleted
handle k r with go parent
{Async.tryAwait (T _ t) -> k} -> handle k (Concurrent.read t) with go parent
{Async.tryParent! -> k} -> handle k (Right (T true parent)) with go parent
{Async.tryIsComplete (T _ t) -> k} -> handle k (Right (not (Concurrent.isEmpty t))) with go parent
{Async.empty! -> k} -> handle k (T false Concurrent.empty!) with go parent
handle !a with go Concurrent.empty!
-- idea: could you write a handler of `Concurrent` that checks a `t x` in
-- between every operation? Yes, seems like it.
---
{{ ``PureTask await complete isComplete`` }}
unique type PureTask s g a
= PureTask ('{Cooperative s g, Exception} a)
(Either Failure a ->{Cooperative s g, g} ())
('{Cooperative s g} Boolean)
pure.run.impl : (∀ t. '{Async t g} a) ->{Cooperative s g, Exception, g} a
pure.run.impl async =
-- todo - think about handling of parent task
go : Request {Async (PureTask s g) g} x ->{Cooperative s g, Exception, g} x
go = cases
{ x } -> x
{ Async.fail e -> k } -> raise e
{ tryDetach a -> k } ->
result = Cooperative.fork '(handle !a with go)
-- suspend!
handle k (Right result) with go
handle !async with go
{-
up.async.Async.run : '{Async Async.Task {IO}} a ->{IO, Exception} a
up.async.Async.run a =
use io.MVar newEmpty
go : MVar (Either Failure ()) -> Request {Async Async.Task {IO}} x ->{IO, Exception} x
go parent = cases
{ a } ->
io.MVar.tryPut parent !Right
a
{Async.fail e -> k} ->
io.MVar.tryPut parent !Right
raise e
{tryDetach a -> k} ->
result = !newEmpty
tid =
io.fork
'let
e = catch '(handle !a with go !newEmpty)
unsafeRun! '(io.MVar.tryPut result e)
result' = fromMVar result |> onComplete '(io.kill tid)
handle k (Right result') with go parent
{Async.tryComplete e t -> k} -> handle k (catch '(Task.complete e t)) with go parent
{Async.tryAwait t -> k} -> handle k (catch '(Task.await t)) with go parent
{Async.tryParent! -> k} -> handle k (Right (fromMVarReadOnly parent)) with go parent
{Async.tryIsComplete t -> k} -> handle k (catch '(Task.isComplete t)) with go parent
{Async.empty! -> k} -> handle k !Task.empty with go parent
handle !a with go !newEmpty
-}
Cooperative.await : PureMVar s g (Either Failure a) ->{Cooperative s g, Exception} a
Cooperative.await mv = Exception.reraise (Cooperative.peek mv)
Cooperative.run : '{Cooperative s g} a ->{Scope s, g} a
Cooperative.run prog =
q : Ref {Scope s} ['{Scope s, g} ()]
q = Scope.ref []
go : '{Scope s} Boolean
-> (x ->{Scope s} ())
-> Request {Cooperative s g} x
-> {Scope s, g} ()
go isDone putResult req =
if !isDone then ()
else match req with
{ x } -> putResult x
{ suspend! -> k } -> Ref.write q (Ref.read q :+ (_ -> handle !k with go isDone putResult))
{ fork a -> k } ->
result = !PureMVar.newEmpty
t = PureTask '(Cooperative.await result) (e -> put e result) '(PureMVar.isEmpty result)
handle k t with go isDone putResult
{ put a mv -> k } -> match impl.get mv with
Empty [] ->
impl.set (Full a []) mv
handle !k with go isDone putResult
Empty ((peek,cb) +: waiting) ->
cb a
impl.set (Empty waiting)
match peek with
Peek -> go isDone putResult req
Take -> ()
Full a0 w ->
impl.set (Full a (w :+ (a, _ -> handle !k with go isDone putResult))) mv
finalResult = Scope.ref None
isDone _ = isSome (Ref.read finalResult)
drainQueue _ =
if !isDone then ()
else match Ref.read q with
[] -> ()
(h +: t) -> !h
Ref.write q t
!drainQueue
handle !prog with go isDone (a -> Ref.write finalResult (Some a))
!drainQueue
match Ref.read finalResult with
None -> bug "deadlock detected"
Some a -> a
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment