Last active
August 29, 2015 14:25
-
-
Save nsf/4999c20ef4ccbbc333a5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module NG.FS.Coroutines | |
open System | |
open System.Threading | |
open System.Collections.Generic | |
open System.Collections.Concurrent | |
type Type = | |
| CPU = 0 | |
| IO = 1 | |
type Priority = | |
| Critical = 0 | |
| High = 1 | |
| Normal = 2 | |
| Low = 3 | |
| Background = 4 | |
[<Struct;NoEquality;NoComparison>] | |
type Parameters = | |
val mutable Name : string | |
val mutable Type : Type | |
val mutable Priority : Priority | |
let private defaultParameters = Parameters(Name = "<unknown>", Type = Type.CPU, Priority = Priority.Normal) | |
type CoType = | |
| Continue = 0 | |
| Yield = 1 | |
| Done = 2 | |
| WaitForCoroutine = 3 | |
| WaitForCoroutines = 4 | |
| WaitForMessage = 5 | |
| LockMutex = 6 | |
| UnlockMutex = 7 | |
[<Struct;NoEquality;NoComparison>] | |
type Co = | |
val Type : CoType | |
val Subject : Object | |
new (t) = { Type = t; Subject = null } | |
new (t, s : Object) = { Type = t; Subject = s } | |
type Routine<'T> = | |
| Done of 'T | |
| NotDone of (unit -> Routine<'T>) * Co | |
[<Struct;NoEquality;NoComparison>] | |
type RoutineCombiner<'T, 'U> = | |
val mutable sequence : Routine<'T> | |
val continuation : 'T -> Routine<'U> | |
new (s, c) = { sequence = s; continuation = c } | |
member this.Continue () : Routine<'U> = | |
NotDone (this.Run, Co(CoType.Continue)) | |
member this.Run () : Routine<'U> = | |
match this.sequence with | |
| Done v -> | |
this.continuation v | |
| NotDone (f, co) -> | |
this.sequence <- f() | |
if co.Type = CoType.Continue then this.Run() | |
else NotDone (this.Continue, co) | |
[<AbstractClass;AllowNullLiteral>] | |
type Coroutine (parameters : Parameters) = | |
member val Parameters = parameters | |
[<DefaultValue>] val mutable Counter : Counter | |
[<DefaultValue>] val mutable Mutex : Mutex | |
abstract member Run : unit -> Co | |
and [<AllowNullLiteral>] Counter () = | |
[<DefaultValue>] val mutable Count : int | |
[<DefaultValue>] val mutable WaitingCoroutine : Coroutine | |
and [<AllowNullLiteral>] Semaphore (count : int) = | |
[<VolatileField>] | |
let mutable guard = 0 | |
member val Queue = Queue<Coroutine>() | |
member val Count = count with get, set | |
member inline internal s.Lock () = while Interlocked.CompareExchange(&guard, 1, 0) <> 0 do () | |
member inline internal s.Unlock () = guard <- 0 | |
and [<AllowNullLiteral>] Mutex () = | |
inherit Semaphore(1) | |
type Coroutine<'T> = | |
inherit Coroutine | |
val mutable private sequence : Routine<'T> | |
[<DefaultValue>] | |
val mutable Result : 'T | |
new (s : Routine<'T>) = { inherit Coroutine(defaultParameters); sequence = s } | |
new (s : Routine<'T>, p : Parameters) = { inherit Coroutine(p); sequence = s } | |
override this.Run() : Co = | |
match this.sequence with | |
| Done v -> | |
this.Result <- v | |
Co(CoType.Done) | |
| NotDone (f, co) -> | |
this.sequence <- f() | |
if co.Type = CoType.Continue then this.Run() | |
else co | |
let inline private bind (co : Co, cont : unit -> Routine<'T>) : Routine<'T> = | |
NotDone ((fun () -> NotDone (cont, Co(CoType.Continue))), co) | |
type RoutineBuilder () = | |
member x.Zero () = Done () | |
member x.Delay (expr) = NotDone (expr, Co(CoType.Continue)) | |
member x.Combine (s1 : Routine<unit>, s2 : Routine<'T>) : Routine<'T> = | |
NotDone (RoutineCombiner(s1, (fun () -> s2)).Run, Co(CoType.Continue)) | |
member x.Bind (seq : Routine<'T>, cont : 'T -> Routine<'U>) : Routine<'U> = | |
NotDone (RoutineCombiner(seq, cont).Run, Co(CoType.Continue)) | |
member x.Bind (co : Co, cont : unit -> Routine<'T>) : Routine<'T> = bind(co, cont) | |
member x.Bind (cor : Coroutine<'T>, cont : 'T -> Routine<'U>) : Routine<'U> = | |
let c = fun () -> cont cor.Result | |
bind(Co(CoType.WaitForCoroutine, cor), c) | |
member x.Bind (cors : seq<Coroutine>, cont : unit -> Routine<'T>) : Routine<'T> = | |
bind(Co(CoType.WaitForCoroutines, cors), cont) | |
member x.Bind<'T, 'U when 'T :> NG.C.BindGen.IMessage> (msg : 'T, cont : unit -> Routine<'U>) : Routine<'U> = | |
bind(Co(CoType.WaitForMessage, msg), cont) | |
member x.Return (expr) = Done expr | |
member x.ReturnFrom (expr) = expr | |
type CoroutineBuilder () = | |
inherit RoutineBuilder() | |
member x.Run(s : Routine<'T>) = Coroutine<'T>(s) | |
type coroutine_p (?Name : string, ?Type : Type, ?Priority : Priority) = | |
inherit RoutineBuilder() | |
let parameters = Parameters(Name = defaultArg Name defaultParameters.Name, | |
Type = defaultArg Type defaultParameters.Type, | |
Priority = defaultArg Priority defaultParameters.Priority) | |
member x.Run(s : Routine<'T>) = Coroutine<'T>(s, parameters) | |
let coroutine = CoroutineBuilder() | |
let routine = RoutineBuilder() | |
module Scheduler = | |
[<Struct>] | |
type private WaitingCoroutine = | |
val mutable NextFree : int | |
val mutable Coroutine : Coroutine | |
let private toCPUWorkers = new BlockingCollection<Coroutine>(ConcurrentQueue<Coroutine>()) | |
let private toIOWorker = new BlockingCollection<Coroutine>(ConcurrentQueue<Coroutine>()) | |
let private workers = List<Thread>() | |
let private suspended = List<WaitingCoroutine>() | |
do suspended.Add(WaitingCoroutine(NextFree = 0)) | |
/// Puts coroutine into queue, returns immediately | |
let Go (c : Coroutine) = | |
match c.Parameters.Type with | |
| Type.CPU -> toCPUWorkers.Add(c) | |
| Type.IO -> toIOWorker.Add(c) | |
| _ -> failwith "Unknown Coroutine type" | |
let Suspend (c : Coroutine) : int = | |
lock suspended <| fun () -> | |
let free = suspended.[0].NextFree | |
if free <> 0 then | |
suspended.[0] <- WaitingCoroutine(NextFree = suspended.[free].NextFree) | |
suspended.[free] <- WaitingCoroutine(Coroutine = c) | |
free | |
else | |
suspended.Add(WaitingCoroutine(Coroutine = c)) | |
suspended.Count-1 | |
let Resume (i : int) : Coroutine = | |
lock suspended <| fun () -> | |
let c = suspended.[i].Coroutine | |
suspended.[i] <- WaitingCoroutine(NextFree = suspended.[0].NextFree) | |
suspended.[0] <- WaitingCoroutine(NextFree = i) | |
c | |
let WaitForMessageAnd (imsg : 'T when 'T :> NG.C.BindGen.IMessage) (c : Coroutine) = | |
let mutable msg = new NG.C.CSMessage(imsg.Handle) | |
msg.Cookie <- Suspend c | |
NG.C.MTQueue.Push(msg) | |
let private signalSemaphore (sem : Semaphore) = | |
sem.Lock() | |
sem.Count <- sem.Count + 1 | |
if sem.Count <= 0 then | |
if sem.Queue.Count > 0 then | |
sem.Queue.Dequeue() |> Go | |
sem.Unlock() | |
/// Returns true if the coroutine was put onto waiting queue | |
let private waitSemaphore (sem : Semaphore) (c : Coroutine) : bool = | |
sem.Lock() | |
sem.Count <- sem.Count - 1 | |
let result = | |
if sem.Count < 0 then | |
sem.Queue.Enqueue(c) | |
true | |
else | |
false | |
sem.Unlock() | |
result | |
let private lockMutex (m : Mutex) (c : Coroutine) = | |
if not (isNull(c.Mutex)) then | |
if c.Mutex = m then | |
failwith "Trying to lock Mutex twice" | |
else | |
failwith "Trying to lock more than one Mutex per Coroutine" | |
c.Mutex <- m | |
waitSemaphore m c | |
let private unlockMutex (m : Mutex) (c : Coroutine) = | |
if c.Mutex <> m then | |
if isNull(c.Mutex) then | |
failwith "Trying to unlock Mutex that was not locked by this Coroutine" | |
else | |
failwith "Trying to unlock Mutex that doesn't match the Mutex locked by this Coroutine" | |
c.Mutex <- null | |
signalSemaphore m | |
let rec private run (c : Coroutine) = | |
let co = c.Run() | |
match co.Type with | |
| CoType.Done -> | |
let cn, m = c.Counter, c.Mutex | |
if not (isNull(m)) then | |
unlockMutex m c | |
if not (isNull(cn)) && Interlocked.Decrement(&cn.Count) = 0 then | |
Go cn.WaitingCoroutine | |
| CoType.Continue -> failwith "Continue is not a valid Scheduler command" | |
| CoType.Yield -> Go c | |
| CoType.WaitForCoroutine -> | |
let cor = co.Subject :?> Coroutine | |
let counter = Counter(Count = 1, WaitingCoroutine = c) | |
cor.Counter <- counter | |
Go cor | |
| CoType.WaitForCoroutines -> | |
let cors = (co.Subject :?> seq<Coroutine>) |> Seq.toArray | |
if cors.Length = 0 then | |
run c | |
else | |
let counter = Counter(Count = cors.Length, WaitingCoroutine = c) | |
for cc in cors do | |
cc.Counter <- counter | |
Go cc | |
| CoType.WaitForMessage -> | |
let imsg = co.Subject :?> NG.C.BindGen.IMessage | |
let mutable msg = new NG.C.CSMessage(imsg.Handle) | |
msg.Cookie <- Suspend c | |
NG.C.MTQueue.Push(msg) | |
| CoType.LockMutex -> | |
let m = co.Subject :?> Mutex | |
if not (lockMutex m c) then | |
run c | |
| CoType.UnlockMutex -> | |
let m = co.Subject :?> Mutex | |
unlockMutex m c | |
run c | |
| _ -> failwith "Unknown Scheduler command type" | |
let private startWorker (incoming : BlockingCollection<Coroutine>) (name : string) = | |
let rec loop() = | |
let c = incoming.Take() | |
if not (isNull(c)) then | |
run c | |
loop() | |
let run() = | |
Thread.BeginThreadAffinity() | |
Console.WriteLine("{0} on duty", name) | |
loop() | |
Console.WriteLine("{0} shutting down", name) | |
Thread.EndThreadAffinity() | |
let thread = Thread(ThreadStart(run)) | |
thread.Start() | |
thread | |
let private startMessagePump dispatch = | |
let inbox = NG.C.Vector_CSMessagePtr.New() | |
let rec loop() : unit = | |
NG.C.MTQueue.PopAll(inbox) | |
let mutable foundZero = false | |
for i = 0 to inbox.Count-1 do | |
let msg = inbox.[i] | |
if msg.Handle = IntPtr.Zero then | |
foundZero <- true | |
else | |
dispatch msg | |
if not foundZero then | |
loop() | |
let run() = | |
Thread.BeginThreadAffinity() | |
Console.WriteLine("NG CLR Message Pump on duty") | |
loop() | |
Console.WriteLine("NG CLR Message Pump shutting down") | |
Thread.EndThreadAffinity() | |
let thread = Thread(ThreadStart(run)) | |
thread.Start() | |
thread | |
let Start (d : NG.C.CSMessage -> unit) = | |
for i = 0 to Environment.ProcessorCount-1 do | |
let name = String.Format("NG CLR CPU Worker #{0}", i) | |
workers.Add(startWorker toCPUWorkers name) | |
workers.Add(startWorker toIOWorker "NG CLR IO Worker") | |
workers.Add(startMessagePump d) | |
let Stop () = | |
for i = 0 to Environment.ProcessorCount-1 do | |
toCPUWorkers.Add(null) | |
toIOWorker.Add(null) | |
NG.C.MTQueue.SendStop() | |
for w in workers do | |
w.Join() | |
let sendMessageAndDispose<'T when 'T :> NG.C.BindGen.IMessage> (msg : 'T) = routine { | |
do! msg | |
msg.Dispose() | |
} | |
let sendMessageAndDisposeAsync<'T when 'T :> NG.C.BindGen.IMessage> (msg : 'T) : unit = | |
Scheduler.WaitForMessageAnd msg <| coroutine { msg.Dispose() } | |
let lockMutex (m : Mutex) : Co = Co(CoType.LockMutex, m) | |
let unlockMutex (m : Mutex) : Co = Co(CoType.UnlockMutex, m) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment