Skip to content

Instantly share code, notes, and snippets.

@iitalics
Created January 12, 2024 17:03
Show Gist options
  • Save iitalics/0c76e571384b07425da4c9a47c2ce59e to your computer and use it in GitHub Desktop.
Save iitalics/0c76e571384b07425da4c9a47c2ce59e to your computer and use it in GitHub Desktop.
type 'a susp = ('a, unit) Effect.Shallow.continuation
type context =
| Top
| Sub of {
mutable n : int;
sus : unit susp;
sup : context;
}
type ready = Ready : 'a _ready -> ready [@unboxed]
and 'a _ready = {
ctx : context;
sus : 'a susp;
arg : 'a;
}
type executor = {
mutex : Mutex.t;
more : Condition.t;
queue : ready Lwt_dllist.t;
}
type _ Effect.t +=
| Async : (executor -> context -> 'a susp -> unit) -> 'a Effect.t
let enqueue_p exe ctx sus arg =
Lwt_dllist.add_r (Ready { ctx; sus; arg }) exe.queue |> ignore
let rec run_p exe =
match Lwt_dllist.take_l exe.queue with
| exception Lwt_dllist.Empty ->
Condition.wait exe.more exe.mutex;
run_p exe
| Ready { sus; arg; ctx } ->
Mutex.unlock exe.mutex;
Effect.Shallow.continue_with sus arg {
retc = (fun () -> resume_u exe ctx);
exnc = raise;
effc = function
| Async cb ->
Some (fun k -> cb exe ctx k; run_u exe)
| _ ->
None
}
and run_u exe =
Mutex.lock exe.mutex;
run_p exe
and resume_u exe = function
| Top -> ((* done *))
| Sub ctx ->
Mutex.lock exe.mutex;
ctx.n <- ctx.n - 1;
if ctx.n = 0 then
enqueue_p exe ctx.sup ctx.sus ();
run_p exe
let run f =
let exe = {
mutex = Mutex.create ();
more = Condition.create ();
queue = Lwt_dllist.create ();
} in
Mutex.lock exe.mutex;
enqueue_p exe Top (Effect.Shallow.fiber f) ();
run_p exe
let enqueue exe ctx sus arg =
Mutex.protect exe.mutex
(fun () ->
enqueue_p exe ctx sus arg;
Condition.signal exe.more)
let async f =
Effect.perform (Async f)
let yield () =
async
(fun exe sup sus -> enqueue exe sup sus ())
let waitfor f x =
async
(fun exe ctx sus ->
let f () = enqueue exe ctx sus (f x) in
Domain.spawn f |> ignore)
let sleep n =
waitfor Unix.sleepf n
let both f g =
async
(fun exe sup sus ->
let ctx = Sub { n = 2; sus; sup } in
enqueue exe ctx (Effect.Shallow.fiber f) ();
enqueue exe ctx (Effect.Shallow.fiber g) ())
val run : (unit -> unit) -> unit
val yield : unit -> unit
val both : (unit -> unit) -> (unit -> unit) -> unit
val waitfor : ('a -> 'b) -> 'a -> 'b
val sleep : float -> unit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment