Skip to content

Instantly share code, notes, and snippets.

@t0yv0 t0yv0/ParWithLog.fs

Created Feb 6, 2014
Embed
What would you like to do?
Par monad extended with deterministic available-as-soon-as-possible logging.
#if INTERACTIVE
#else
namespace Examples
#endif
(*
[<Sealed>]
type Future<'T>
module Future =
val Create : unit -> Future<'T>
val CreateCompleted : 'T -> Future<'T>
val CreateFailed : exn -> Future<'T>
val Await : Future<'T> -> Async<'T>
val Fail : Future<'T> -> exn -> unit
val Set : Future<'T> -> 'T -> unit
*)
open System
open System.Threading
type FutureState<'T> =
| Completed of 'T
| Failed of exn
| Waiting of list<('T -> unit) * (exn -> unit)>
type Future<'T> =
{
mutable FutureState : FutureState<'T>
}
module Future =
let Create () =
{ FutureState = Waiting [] }
let CreateCompleted x =
{ FutureState = Completed x }
let CreateFailed e =
{ FutureState = Failed e }
let Update fut f =
let rec loop i =
let st0 = fut.FutureState
let st1 = f st0
if Object.ReferenceEquals(st0, Interlocked.CompareExchange(&fut.FutureState, st1, st0)) then
st0
else
Thread.SpinWait(1 <<< min i 24)
loop (i + 1)
loop 0
let Await fut =
Async.FromContinuations <| fun (ok, no, _) ->
let state =
Update fut <| fun st ->
match st with
| Completed r -> st
| Failed e -> st
| Waiting xs -> Waiting ((ok, no) :: xs)
match state with
| Completed r -> ok r
| Failed e -> no e
| Waiting _ -> ()
let Fail fut e =
let state =
let s = Failed e
Update fut <| fun st ->
match st with
| Waiting _ -> s
| _ -> st
match state with
| Waiting xs -> for (_, k) in xs do k e
| _ -> failwith "Future.Fail broke contract: called more than once"
let Set fut x =
let state =
let s = Completed x
Update fut <| fun st ->
match st with
| Waiting _ -> s
| _ -> st
match state with
| Waiting xs -> for (k, _) in xs do k x
| _ -> failwith "Future.Set broke contract: called more than once"
(*
[<Sealed>]
type Stream<'T>
module Stream =
val Empty<'T> : Stream<'T>
val Append<'T> : Stream<'T> -> Stream<'T> -> Stream<'T>
val FromFuture : Future<Stream<'T>> -> Stream<'T>
val Singleton : 'T -> Stream<'T>
val Iterate : ('T -> unit) -> Stream<'T> -> Async<unit>
*)
type Stream<'T> =
| S0
| S1 of 'T
| SApp of Stream<'T> * Stream<'T>
| SFut of Future<Stream<'T>>
module Stream =
let Empty<'T> : Stream<'T> =
S0
let Singleton x =
S1 x
let FromFuture fut =
SFut fut
let Append a b =
SApp (a, b)
let rec Iterate (f: 'T -> unit) (s: Stream<'T>) : Async<unit> =
async {
match s with
| S0 -> return ()
| S1 x -> return f x
| SApp (a, b) ->
do! Iterate f a
return! Iterate f b
| SFut fut ->
let! str = Future.Await fut
return! Iterate f str
}
(*
type Message =
| Message of string
[<Sealed>]
type Par<'T>
module Par =
val Return : 'T -> Par<'T>
val Bind : Par<'T1> -> ('T1 -> Par<'T2>) -> Par<'T2>
val Spawn : Par<'T> -> Par<Future<'T>>
val Await : Future<'T> -> Par<'T>
val DoAsync : Async<'T> -> Par<'T>
val Log : Message -> Par<unit>
val Start : (Message -> unit) -> Par<'T> -> Async<'T>
*)
type Message =
| Message of string
type Par<'T> =
| Par of (unit -> Stream<Message> * Future<'T>)
module Par =
let Def par =
Par par
let Return (x: 'T) : Par<'T> =
Def <| fun () -> (Stream.Empty, Future.CreateCompleted x)
let Bind (Par x) (f: 'T1 -> Par<'T2>) : Par<'T2> =
Def <| fun () ->
let (streamHead, x) = x ()
let result = Future.Create()
let streamVar = Future.Create()
async {
try
let! x = Future.Await x
let (Par yF) = f x
let (yS, yV) = yF ()
do Future.Set streamVar yS
let! y = Future.Await yV
return Future.Set result y
with e ->
return Future.Fail result e
}
|> Async.Start
let stream = Stream.Append streamHead (Stream.FromFuture streamVar)
(stream, result)
let Spawn (Par comp : Par<'T>) : Par<Future<'T>> =
Def <| fun () ->
let (s, v) = comp ()
(s, Future.CreateCompleted v)
let DoAsync (a: Async<'T>) : Par<'T> =
Def <| fun () ->
let v = Future.Create ()
async {
try
let! r = a
return Future.Set v r
with e ->
return Future.Fail v e
}
|> Async.Start
(Stream.Empty, v)
let Await (f: Future<'T>) : Par<'T> =
DoAsync <| async { return! Future.Await f }
let Log (msg: Message) : Par<unit> =
Def <| fun () ->
(Stream.Singleton msg, Future.CreateCompleted ())
let Start (log: Message -> unit) (Par p : Par<'T>) : Async<'T> =
async {
let (s, v) = p ()
do! Stream.Iterate log s
return! Future.Await v
}
(*
Example: parallelized fibonacci.
*)
module Main =
[<Sealed>]
type ParBuilder() =
member x.Bind(v, f) = Par.Bind v f
member x.Return(v) = Par.Return v
let par = ParBuilder()
let rec ParFib (n: int) : Par<int> =
par {
do! Par.Log (Message (sprintf "%i" n))
do! Par.DoAsync (Async.Sleep 250)
match n with
| 0 | 1 ->
return 1
| n ->
let! a = Par.Spawn (ParFib (n - 1))
let! b = ParFib (n - 2)
let! a = Par.Await a
return a + b
}
let Start () =
async {
let! value =
ParFib 10
|> Par.Start (fun (Message s) -> stdout.WriteLine(s); stdout.Flush())
return printfn "Computed: %i" value
}
|> Async.RunSynchronously
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.