public
Created

  • Download Gist
Channel.fs
F#
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
 
[<Sealed>]
type Channel<'T>() =
let root = obj ()
let qF = Queue<'T->unit>()
let qX = Queue<'T>()
let mutable n = 0
let enq x =
lock root (fun () ->
let k = n
n <- n + 1
if k < 0 then
let f = qF.Dequeue()
fun () -> f x
else
qX.Enqueue(x)
ignore) ()
let deq f =
lock root (fun () ->
let k = n
n <- n - 1
if k > 0 then
let x = qX.Dequeue()
fun () -> f x
else
qF.Enqueue(f)
ignore) ()
let receiveAsync =
Async.FromContinuations(fun (ok, _, _) -> deq ok)
 
member this.Receive() = receiveAsync
member this.Receive(f) = deq f
member this.Send(x) = enq x
 
[<Sealed>]
type Agent<'T,'R>(f: Channel<'T> -> Async<'R>, opts) =
let chan = Channel<'T>()
let task = Async.StartAsTask(f chan, opts)
member this.Send(m) = chan.Send(m)
member this.Task = task
 
[<Sealed>]
type Agent =
static member Start(f) = Agent(f, TaskCreationOptions.None)
static member Start(f, opts) = Agent(f, opts)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.