open System | |
open System.Collections.Generic | |
open System.Threading | |
open System.Threading.Tasks | |
[<Sealed>] | |
type Channel<'T>() = | |
let root = obj () | |
let qF = Queue<'T->unit>() | |
let qX = Queue<'T>() | |
let mutable n = 0 | |
let enq x = | |
lock root (fun () -> | |
let k = n | |
n <- n + 1 | |
if k < 0 then | |
let f = qF.Dequeue() | |
fun () -> f x | |
else | |
qX.Enqueue(x) | |
ignore) () | |
let deq f = | |
lock root (fun () -> | |
let k = n | |
n <- n - 1 | |
if k > 0 then | |
let x = qX.Dequeue() | |
fun () -> f x | |
else | |
qF.Enqueue(f) | |
ignore) () | |
let receiveAsync = | |
Async.FromContinuations(fun (ok, _, _) -> deq ok) | |
member this.Receive() = receiveAsync | |
member this.Receive(f) = deq f | |
member this.Send(x) = enq x | |
[<Sealed>] | |
type Agent<'T,'R>(f: Channel<'T> -> Async<'R>, opts) = | |
let chan = Channel<'T>() | |
let task = Async.StartAsTask(f chan, opts) | |
member this.Send(m) = chan.Send(m) | |
member this.Task = task | |
[<Sealed>] | |
type Agent = | |
static member Start(f) = Agent(f, TaskCreationOptions.None) | |
static member Start(f, opts) = Agent(f, opts) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment