Skip to content

Instantly share code, notes, and snippets.

@nsf
Last active August 29, 2015 14:25
Show Gist options
  • Save nsf/4999c20ef4ccbbc333a5 to your computer and use it in GitHub Desktop.
Save nsf/4999c20ef4ccbbc333a5 to your computer and use it in GitHub Desktop.
module NG.FS.Coroutines
open System
open System.Threading
open System.Collections.Generic
open System.Collections.Concurrent
type Type =
| CPU = 0
| IO = 1
type Priority =
| Critical = 0
| High = 1
| Normal = 2
| Low = 3
| Background = 4
[<Struct;NoEquality;NoComparison>]
type Parameters =
val mutable Name : string
val mutable Type : Type
val mutable Priority : Priority
let private defaultParameters = Parameters(Name = "<unknown>", Type = Type.CPU, Priority = Priority.Normal)
type CoType =
| Continue = 0
| Yield = 1
| Done = 2
| WaitForCoroutine = 3
| WaitForCoroutines = 4
| WaitForMessage = 5
| LockMutex = 6
| UnlockMutex = 7
[<Struct;NoEquality;NoComparison>]
type Co =
val Type : CoType
val Subject : Object
new (t) = { Type = t; Subject = null }
new (t, s : Object) = { Type = t; Subject = s }
type Routine<'T> =
| Done of 'T
| NotDone of (unit -> Routine<'T>) * Co
[<Struct;NoEquality;NoComparison>]
type RoutineCombiner<'T, 'U> =
val mutable sequence : Routine<'T>
val continuation : 'T -> Routine<'U>
new (s, c) = { sequence = s; continuation = c }
member this.Continue () : Routine<'U> =
NotDone (this.Run, Co(CoType.Continue))
member this.Run () : Routine<'U> =
match this.sequence with
| Done v ->
this.continuation v
| NotDone (f, co) ->
this.sequence <- f()
if co.Type = CoType.Continue then this.Run()
else NotDone (this.Continue, co)
[<AbstractClass;AllowNullLiteral>]
type Coroutine (parameters : Parameters) =
member val Parameters = parameters
[<DefaultValue>] val mutable Counter : Counter
[<DefaultValue>] val mutable Mutex : Mutex
abstract member Run : unit -> Co
and [<AllowNullLiteral>] Counter () =
[<DefaultValue>] val mutable Count : int
[<DefaultValue>] val mutable WaitingCoroutine : Coroutine
and [<AllowNullLiteral>] Semaphore (count : int) =
[<VolatileField>]
let mutable guard = 0
member val Queue = Queue<Coroutine>()
member val Count = count with get, set
member inline internal s.Lock () = while Interlocked.CompareExchange(&guard, 1, 0) <> 0 do ()
member inline internal s.Unlock () = guard <- 0
and [<AllowNullLiteral>] Mutex () =
inherit Semaphore(1)
type Coroutine<'T> =
inherit Coroutine
val mutable private sequence : Routine<'T>
[<DefaultValue>]
val mutable Result : 'T
new (s : Routine<'T>) = { inherit Coroutine(defaultParameters); sequence = s }
new (s : Routine<'T>, p : Parameters) = { inherit Coroutine(p); sequence = s }
override this.Run() : Co =
match this.sequence with
| Done v ->
this.Result <- v
Co(CoType.Done)
| NotDone (f, co) ->
this.sequence <- f()
if co.Type = CoType.Continue then this.Run()
else co
let inline private bind (co : Co, cont : unit -> Routine<'T>) : Routine<'T> =
NotDone ((fun () -> NotDone (cont, Co(CoType.Continue))), co)
type RoutineBuilder () =
member x.Zero () = Done ()
member x.Delay (expr) = NotDone (expr, Co(CoType.Continue))
member x.Combine (s1 : Routine<unit>, s2 : Routine<'T>) : Routine<'T> =
NotDone (RoutineCombiner(s1, (fun () -> s2)).Run, Co(CoType.Continue))
member x.Bind (seq : Routine<'T>, cont : 'T -> Routine<'U>) : Routine<'U> =
NotDone (RoutineCombiner(seq, cont).Run, Co(CoType.Continue))
member x.Bind (co : Co, cont : unit -> Routine<'T>) : Routine<'T> = bind(co, cont)
member x.Bind (cor : Coroutine<'T>, cont : 'T -> Routine<'U>) : Routine<'U> =
let c = fun () -> cont cor.Result
bind(Co(CoType.WaitForCoroutine, cor), c)
member x.Bind (cors : seq<Coroutine>, cont : unit -> Routine<'T>) : Routine<'T> =
bind(Co(CoType.WaitForCoroutines, cors), cont)
member x.Bind<'T, 'U when 'T :> NG.C.BindGen.IMessage> (msg : 'T, cont : unit -> Routine<'U>) : Routine<'U> =
bind(Co(CoType.WaitForMessage, msg), cont)
member x.Return (expr) = Done expr
member x.ReturnFrom (expr) = expr
type CoroutineBuilder () =
inherit RoutineBuilder()
member x.Run(s : Routine<'T>) = Coroutine<'T>(s)
type coroutine_p (?Name : string, ?Type : Type, ?Priority : Priority) =
inherit RoutineBuilder()
let parameters = Parameters(Name = defaultArg Name defaultParameters.Name,
Type = defaultArg Type defaultParameters.Type,
Priority = defaultArg Priority defaultParameters.Priority)
member x.Run(s : Routine<'T>) = Coroutine<'T>(s, parameters)
let coroutine = CoroutineBuilder()
let routine = RoutineBuilder()
module Scheduler =
[<Struct>]
type private WaitingCoroutine =
val mutable NextFree : int
val mutable Coroutine : Coroutine
let private toCPUWorkers = new BlockingCollection<Coroutine>(ConcurrentQueue<Coroutine>())
let private toIOWorker = new BlockingCollection<Coroutine>(ConcurrentQueue<Coroutine>())
let private workers = List<Thread>()
let private suspended = List<WaitingCoroutine>()
do suspended.Add(WaitingCoroutine(NextFree = 0))
/// Puts coroutine into queue, returns immediately
let Go (c : Coroutine) =
match c.Parameters.Type with
| Type.CPU -> toCPUWorkers.Add(c)
| Type.IO -> toIOWorker.Add(c)
| _ -> failwith "Unknown Coroutine type"
let Suspend (c : Coroutine) : int =
lock suspended <| fun () ->
let free = suspended.[0].NextFree
if free <> 0 then
suspended.[0] <- WaitingCoroutine(NextFree = suspended.[free].NextFree)
suspended.[free] <- WaitingCoroutine(Coroutine = c)
free
else
suspended.Add(WaitingCoroutine(Coroutine = c))
suspended.Count-1
let Resume (i : int) : Coroutine =
lock suspended <| fun () ->
let c = suspended.[i].Coroutine
suspended.[i] <- WaitingCoroutine(NextFree = suspended.[0].NextFree)
suspended.[0] <- WaitingCoroutine(NextFree = i)
c
let WaitForMessageAnd (imsg : 'T when 'T :> NG.C.BindGen.IMessage) (c : Coroutine) =
let mutable msg = new NG.C.CSMessage(imsg.Handle)
msg.Cookie <- Suspend c
NG.C.MTQueue.Push(msg)
let private signalSemaphore (sem : Semaphore) =
sem.Lock()
sem.Count <- sem.Count + 1
if sem.Count <= 0 then
if sem.Queue.Count > 0 then
sem.Queue.Dequeue() |> Go
sem.Unlock()
/// Returns true if the coroutine was put onto waiting queue
let private waitSemaphore (sem : Semaphore) (c : Coroutine) : bool =
sem.Lock()
sem.Count <- sem.Count - 1
let result =
if sem.Count < 0 then
sem.Queue.Enqueue(c)
true
else
false
sem.Unlock()
result
let private lockMutex (m : Mutex) (c : Coroutine) =
if not (isNull(c.Mutex)) then
if c.Mutex = m then
failwith "Trying to lock Mutex twice"
else
failwith "Trying to lock more than one Mutex per Coroutine"
c.Mutex <- m
waitSemaphore m c
let private unlockMutex (m : Mutex) (c : Coroutine) =
if c.Mutex <> m then
if isNull(c.Mutex) then
failwith "Trying to unlock Mutex that was not locked by this Coroutine"
else
failwith "Trying to unlock Mutex that doesn't match the Mutex locked by this Coroutine"
c.Mutex <- null
signalSemaphore m
let rec private run (c : Coroutine) =
let co = c.Run()
match co.Type with
| CoType.Done ->
let cn, m = c.Counter, c.Mutex
if not (isNull(m)) then
unlockMutex m c
if not (isNull(cn)) && Interlocked.Decrement(&cn.Count) = 0 then
Go cn.WaitingCoroutine
| CoType.Continue -> failwith "Continue is not a valid Scheduler command"
| CoType.Yield -> Go c
| CoType.WaitForCoroutine ->
let cor = co.Subject :?> Coroutine
let counter = Counter(Count = 1, WaitingCoroutine = c)
cor.Counter <- counter
Go cor
| CoType.WaitForCoroutines ->
let cors = (co.Subject :?> seq<Coroutine>) |> Seq.toArray
if cors.Length = 0 then
run c
else
let counter = Counter(Count = cors.Length, WaitingCoroutine = c)
for cc in cors do
cc.Counter <- counter
Go cc
| CoType.WaitForMessage ->
let imsg = co.Subject :?> NG.C.BindGen.IMessage
let mutable msg = new NG.C.CSMessage(imsg.Handle)
msg.Cookie <- Suspend c
NG.C.MTQueue.Push(msg)
| CoType.LockMutex ->
let m = co.Subject :?> Mutex
if not (lockMutex m c) then
run c
| CoType.UnlockMutex ->
let m = co.Subject :?> Mutex
unlockMutex m c
run c
| _ -> failwith "Unknown Scheduler command type"
let private startWorker (incoming : BlockingCollection<Coroutine>) (name : string) =
let rec loop() =
let c = incoming.Take()
if not (isNull(c)) then
run c
loop()
let run() =
Thread.BeginThreadAffinity()
Console.WriteLine("{0} on duty", name)
loop()
Console.WriteLine("{0} shutting down", name)
Thread.EndThreadAffinity()
let thread = Thread(ThreadStart(run))
thread.Start()
thread
let private startMessagePump dispatch =
let inbox = NG.C.Vector_CSMessagePtr.New()
let rec loop() : unit =
NG.C.MTQueue.PopAll(inbox)
let mutable foundZero = false
for i = 0 to inbox.Count-1 do
let msg = inbox.[i]
if msg.Handle = IntPtr.Zero then
foundZero <- true
else
dispatch msg
if not foundZero then
loop()
let run() =
Thread.BeginThreadAffinity()
Console.WriteLine("NG CLR Message Pump on duty")
loop()
Console.WriteLine("NG CLR Message Pump shutting down")
Thread.EndThreadAffinity()
let thread = Thread(ThreadStart(run))
thread.Start()
thread
let Start (d : NG.C.CSMessage -> unit) =
for i = 0 to Environment.ProcessorCount-1 do
let name = String.Format("NG CLR CPU Worker #{0}", i)
workers.Add(startWorker toCPUWorkers name)
workers.Add(startWorker toIOWorker "NG CLR IO Worker")
workers.Add(startMessagePump d)
let Stop () =
for i = 0 to Environment.ProcessorCount-1 do
toCPUWorkers.Add(null)
toIOWorker.Add(null)
NG.C.MTQueue.SendStop()
for w in workers do
w.Join()
let sendMessageAndDispose<'T when 'T :> NG.C.BindGen.IMessage> (msg : 'T) = routine {
do! msg
msg.Dispose()
}
let sendMessageAndDisposeAsync<'T when 'T :> NG.C.BindGen.IMessage> (msg : 'T) : unit =
Scheduler.WaitForMessageAnd msg <| coroutine { msg.Dispose() }
let lockMutex (m : Mutex) : Co = Co(CoType.LockMutex, m)
let unlockMutex (m : Mutex) : Co = Co(CoType.UnlockMutex, m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment