Last active
May 13, 2020 22:16
-
-
Save hodzanassredin/21e350bd8602c4432f01 to your computer and use it in GitHub Desktop.
hopac alternatives in fsharp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
module AsyncExt = | |
open System.Threading | |
let choice workflows = | |
Async.FromContinuations(fun (cont, _, _) -> | |
let cts = new CancellationTokenSource() | |
let completed = ref false | |
let lockObj = new obj() | |
let synchronized f = lock lockObj f | |
let completeOnce res = | |
let run = | |
synchronized(fun () -> | |
if completed.Value then false | |
else completed := true; true) | |
if run then cont res | |
let runWorkflow workflow = async { | |
let! res = workflow | |
cts.Cancel() | |
completeOnce res } | |
for work in workflows do | |
Async.Start(runWorkflow work, cts.Token) ) | |
module Promise = | |
open System.Threading.Tasks | |
type Promise<'a> = {signal: 'a -> unit; future : Async<'a>} | |
let create<'a> () = | |
let tcs = new TaskCompletionSource<'a>() | |
let ta: Async<'a> = Async.AwaitTask tcs.Task | |
{signal = tcs.SetResult; future = ta} | |
let never<'a> () = {create<'a>() with signal = (fun _ -> ())} | |
module Alt = | |
open System | |
open Promise | |
open System.Threading | |
type Alt<'a> = Alt of (Async<unit> -> Async<'a>) | |
let withNack<'a> (f: Async<unit> -> Async<'a>) : Alt<'a> = Alt(f) | |
let guard (f: Async<'a>) : Alt<'a> = withNack(fun _ -> f) | |
let never<'a>() = guard <| Promise.never<'a>().future | |
let always x = guard <| async{return x} | |
let extract (Alt(a)) = a | |
let choose (a:Alt<'a>, b:Alt<'a>) = | |
Alt(fun nack -> | |
let nack1 = Promise.create() | |
let nack2 = Promise.create() | |
let wrkfl1 = async{ | |
let! v = a |> extract <| (AsyncExt.choice [nack1.future;nack]) | |
nack2.signal() | |
return v | |
} | |
let wrkfl2 = async{ | |
let! v = b |> extract <| (AsyncExt.choice [nack2.future;nack]) | |
nack1.signal() | |
return v | |
} | |
AsyncExt.choice [|wrkfl1;wrkfl2|]) | |
let chooseC a b = choose(a,b) | |
let select (s : seq<Alt<'a>>) = Seq.foldBack chooseC s (never<'a>()) | |
let private rand = new System.Random(DateTime.Now.Millisecond) | |
let shuffle s = Seq.sortBy(fun _ -> rand.Next()) s | |
let wrap ((Alt(a)),f) = Alt(fun nack -> async{ | |
let! x = a(nack) | |
return f(x)}) | |
let run<'a> (a:Alt<'a>) = | |
a|> extract <| Promise.never<unit>().future | |
let sync<'a> (a:Alt<'a>) = | |
let a = a|> extract <| Promise.never<unit>().future | |
Async.RunSynchronously(a) | |
open Alt | |
let test = sync >> printfn "resut is %A" | |
let time x = async{do! Async.Sleep(x) | |
printfn "time %d" x} | |
let add_nack name (Alt(x)) = withNack (fun nack -> async{ | |
async{ | |
do! nack | |
printfn "nacked %s" name | |
} |> Async.Start | |
return! x(nack) | |
}) | |
[always(1); always(2)] |> select |> test | |
[always(1); always(2)] |> shuffle |> select |> test | |
choose (always(1) |> add_nack "always(1)", | |
always(2) |> add_nack "always(2)") |> test | |
[always(1) |> add_nack "always(1)"; | |
always(2) |> add_nack "always(2)"] |> select |> test | |
let test1 ev g f= | |
let a = sync(wrap(wrap(ev, g), f)) | |
let b = sync(wrap(ev, g>>f)) | |
a = b | |
let test2 ev1 ev2 f = | |
let a = sync(wrap(choose(ev1, ev2), f)) | |
let b = sync(choose(wrap(ev1, f), wrap(ev2, f))) | |
a = b | |
test1 (always(1)) (fun x -> x + 1) (fun x -> 2 * x) |> printfn "test1 %b" | |
test2 (always(1)) (never()) (fun x -> x + 1) |> printfn "test2 %b" | |
//ack server as workaround for no channels | |
type Server<'a> = Server of (Alt<Server<'a>>) | |
let unwrap_server (Server e) = e | |
let rand = new System.Random(DateTime.Now.Millisecond) | |
let rec server (x:int) (send_resp: int -> Async<unit>) : Server<int> = | |
Server( | |
withNack <| fun nack -> async{ | |
let alt = Alt.choose (wrap(guard(nack), fun _ -> x) | |
,wrap(guard(send_resp(x)), fun _ -> x + 1)) | |
let! x = run alt | |
return server x send_resp | |
}) | |
let rec poll_client (rcv_resp: unit -> Async<int>) (server:Server<int>) (server2:Server<int>) (acc: int list) : Async<int list> = async{ | |
if List.length acc > 10 then return acc | |
else let new_serv = choose(wrap(unwrap_server server, fun new_serv -> new_serv, server2) | |
,wrap(unwrap_server server2, fun new_serv -> new_serv, server)) | |
let! s1,s2 = run new_serv | |
let! msg = rcv_resp() | |
return! poll_client rcv_resp s1 s2 (msg::acc) | |
} | |
let queue : int option ref = ref None | |
let synchronized f = lock queue f | |
let rec get () = async{ | |
do! Async.Sleep(rand.Next(1000)) | |
let v = synchronized <| fun () -> | |
let res = queue.Value | |
if res.IsSome then queue := None | |
res | |
if v.IsSome then return v.Value | |
else return! get() | |
} | |
let rec put x = async{ | |
do! Async.Sleep(rand.Next(1000)) | |
let ok = synchronized <| fun () -> if queue.Value.IsNone | |
then queue := Some(x) | |
true | |
else false | |
if ok then return () | |
else return! put x | |
} | |
let in_serv = server 0 put | |
let in_serv2 = server 1000 put | |
let client = poll_client get in_serv in_serv2 [] | |
Async.RunSynchronously(client) |> printf "result %A" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment