Skip to content

Instantly share code, notes, and snippets.

@t0yv0
Created December 31, 2011 00:06
(*
Copyright (c) 2008-2011 IntelliFactory
GNU Affero General Public License Usage The code
is free software: you can redistribute it and/or
modify it under the terms of the GNU Affero
General Public License, version 3, as published by
the Free Software Foundation.
The code is distributed in the hope that it will
be useful, but WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Affero
General Public License for more details at
<http://www.gnu.org/licenses/>.
If you are unsure which license is appropriate for
your use, please contact IntelliFactory at
<http://intellifactory.com/contact>.
See this blog for the discussion:
<http://tinyurl.com/fsharp-futures>
*)
#if INTERACTIVE
#else
namespace IntelliFactory.Examples
#endif
open System
open System.Threading
open System.Threading.Tasks
open System.Collections.Concurrent
type private FutureState<'T> =
| Computed of 'T
| Created
| Finalized
| Waiting of ('T -> unit)
[<Sealed>]
type Future<'T>() =
let root = obj ()
let transact f = lock root f ()
let mutable state : FutureState<'T> = Created
let await f =
transact <| fun () ->
match state with
| Computed x -> state <- Finalized; (fun () -> f x)
| Created -> state <- Waiting f; ignore
| Finalized -> invalidOp "Future is finalized."
| Waiting f -> invalidOp "Future is already waited on."
let provide value =
transact <| fun () ->
match state with
| Computed x -> invalidOp "Future is already provided."
| Created -> state <- Computed value; ignore
| Finalized -> invalidOp "Future is finalized."
| Waiting f -> state <- Finalized; (fun () -> f value)
let event = Async.FromContinuations(fun (k, _, _) -> await k)
member this.Await = event
member this.Provide(value) = provide value
[<Sealed>]
type Executor(?maxTaskCount, ?logError) =
let logError = defaultArg logError ignore
let mailbox =
let n = defaultArg maxTaskCount 128
new BlockingCollection<_>(ConcurrentQueue(), n)
let work () =
let mutable loop = true
while loop do
match mailbox.Take() with
| None -> loop <- false
| Some exec -> try exec () with e -> logError e
let task =
Task.Factory.StartNew(work,
TaskCreationOptions.LongRunning)
member this.Dispose() =
mailbox.Add(None)
mailbox.CompleteAdding()
task.Wait()
task.Dispose()
mailbox.Dispose()
member this.Fork(job: Async<'T>) =
let f = Future()
let work () =
Async.StartWithContinuations(job,
f.Provide, logError, logError)
this.Schedule(work)
f.Await
member this.Schedule(task) = mailbox.Add(Some task)
member this.TaskCount = mailbox.Count
interface IDisposable with
member this.Dispose() = this.Dispose()
#if INTERACTIVE
let test () =
use e = new Executor()
let task =
async {
let read = e.Fork(async { return stdin.ReadLine() })
do stdout.WriteLine("Waiting for input..")
return! read
}
Async.RunSynchronously task
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment