Created
December 31, 2011 00:06
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(* | |
Copyright (c) 2008-2011 IntelliFactory | |
GNU Affero General Public License Usage The code | |
is free software: you can redistribute it and/or | |
modify it under the terms of the GNU Affero | |
General Public License, version 3, as published by | |
the Free Software Foundation. | |
The code is distributed in the hope that it will | |
be useful, but WITHOUT ANY WARRANTY; without even | |
the implied warranty of MERCHANTABILITY or FITNESS | |
FOR A PARTICULAR PURPOSE. See the GNU Affero | |
General Public License for more details at | |
<http://www.gnu.org/licenses/>. | |
If you are unsure which license is appropriate for | |
your use, please contact IntelliFactory at | |
<http://intellifactory.com/contact>. | |
See this blog for the discussion: | |
<http://tinyurl.com/fsharp-futures> | |
*) | |
#if INTERACTIVE | |
#else | |
namespace IntelliFactory.Examples | |
#endif | |
open System | |
open System.Threading | |
open System.Threading.Tasks | |
open System.Collections.Concurrent | |
type private FutureState<'T> = | |
| Computed of 'T | |
| Created | |
| Finalized | |
| Waiting of ('T -> unit) | |
[<Sealed>] | |
type Future<'T>() = | |
let root = obj () | |
let transact f = lock root f () | |
let mutable state : FutureState<'T> = Created | |
let await f = | |
transact <| fun () -> | |
match state with | |
| Computed x -> state <- Finalized; (fun () -> f x) | |
| Created -> state <- Waiting f; ignore | |
| Finalized -> invalidOp "Future is finalized." | |
| Waiting f -> invalidOp "Future is already waited on." | |
let provide value = | |
transact <| fun () -> | |
match state with | |
| Computed x -> invalidOp "Future is already provided." | |
| Created -> state <- Computed value; ignore | |
| Finalized -> invalidOp "Future is finalized." | |
| Waiting f -> state <- Finalized; (fun () -> f value) | |
let event = Async.FromContinuations(fun (k, _, _) -> await k) | |
member this.Await = event | |
member this.Provide(value) = provide value | |
[<Sealed>] | |
type Executor(?maxTaskCount, ?logError) = | |
let logError = defaultArg logError ignore | |
let mailbox = | |
let n = defaultArg maxTaskCount 128 | |
new BlockingCollection<_>(ConcurrentQueue(), n) | |
let work () = | |
let mutable loop = true | |
while loop do | |
match mailbox.Take() with | |
| None -> loop <- false | |
| Some exec -> try exec () with e -> logError e | |
let task = | |
Task.Factory.StartNew(work, | |
TaskCreationOptions.LongRunning) | |
member this.Dispose() = | |
mailbox.Add(None) | |
mailbox.CompleteAdding() | |
task.Wait() | |
task.Dispose() | |
mailbox.Dispose() | |
member this.Fork(job: Async<'T>) = | |
let f = Future() | |
let work () = | |
Async.StartWithContinuations(job, | |
f.Provide, logError, logError) | |
this.Schedule(work) | |
f.Await | |
member this.Schedule(task) = mailbox.Add(Some task) | |
member this.TaskCount = mailbox.Count | |
interface IDisposable with | |
member this.Dispose() = this.Dispose() | |
#if INTERACTIVE | |
let test () = | |
use e = new Executor() | |
let task = | |
async { | |
let read = e.Fork(async { return stdin.ReadLine() }) | |
do stdout.WriteLine("Waiting for input..") | |
return! read | |
} | |
Async.RunSynchronously task | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment