Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
[<Sealed>]
type Channel<'T>() =
let root = obj ()
let qF = Queue<'T->unit>()
let qX = Queue<'T>()
let mutable n = 0
let enq x =
lock root (fun () ->
let k = n
n <- n + 1
if k < 0 then
let f = qF.Dequeue()
fun () -> f x
else
qX.Enqueue(x)
ignore) ()
let deq f =
lock root (fun () ->
let k = n
n <- n - 1
if k > 0 then
let x = qX.Dequeue()
fun () -> f x
else
qF.Enqueue(f)
ignore) ()
let receiveAsync =
Async.FromContinuations(fun (ok, _, _) -> deq ok)
member this.Receive() = receiveAsync
member this.Receive(f) = deq f
member this.Send(x) = enq x
[<Sealed>]
type Agent<'T,'R>(f: Channel<'T> -> Async<'R>, opts) =
let chan = Channel<'T>()
let task = Async.StartAsTask(f chan, opts)
member this.Send(m) = chan.Send(m)
member this.Task = task
[<Sealed>]
type Agent =
static member Start(f) = Agent(f, TaskCreationOptions.None)
static member Start(f, opts) = Agent(f, opts)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.