Skip to content

Instantly share code, notes, and snippets.

@t0yv0
Created October 3, 2012 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save t0yv0/3827112 to your computer and use it in GitHub Desktop.
Save t0yv0/3827112 to your computer and use it in GitHub Desktop.
open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
[<Sealed>]
type Channel<'T>() =
let root = obj ()
let qF = Queue<'T->unit>()
let qX = Queue<'T>()
let mutable n = 0
let enq x =
lock root (fun () ->
let k = n
n <- n + 1
if k < 0 then
let f = qF.Dequeue()
fun () -> f x
else
qX.Enqueue(x)
ignore) ()
let deq f =
lock root (fun () ->
let k = n
n <- n - 1
if k > 0 then
let x = qX.Dequeue()
fun () -> f x
else
qF.Enqueue(f)
ignore) ()
let receiveAsync =
Async.FromContinuations(fun (ok, _, _) -> deq ok)
member this.Receive() = receiveAsync
member this.Receive(f) = deq f
member this.Send(x) = enq x
[<Sealed>]
type Agent<'T,'R>(f: Channel<'T> -> Async<'R>, opts) =
let chan = Channel<'T>()
let task = Async.StartAsTask(f chan, opts)
member this.Send(m) = chan.Send(m)
member this.Task = task
[<Sealed>]
type Agent =
static member Start(f) = Agent(f, TaskCreationOptions.None)
static member Start(f, opts) = Agent(f, opts)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment