Skip to content

Instantly share code, notes, and snippets.

@hodzanassredin
Last active May 13, 2020 22:16
Show Gist options
  • Save hodzanassredin/21e350bd8602c4432f01 to your computer and use it in GitHub Desktop.
Save hodzanassredin/21e350bd8602c4432f01 to your computer and use it in GitHub Desktop.
hopac alternatives in fsharp
open System
module AsyncExt =
open System.Threading
let choice workflows =
Async.FromContinuations(fun (cont, _, _) ->
let cts = new CancellationTokenSource()
let completed = ref false
let lockObj = new obj()
let synchronized f = lock lockObj f
let completeOnce res =
let run =
synchronized(fun () ->
if completed.Value then false
else completed := true; true)
if run then cont res
let runWorkflow workflow = async {
let! res = workflow
cts.Cancel()
completeOnce res }
for work in workflows do
Async.Start(runWorkflow work, cts.Token) )
module Promise =
open System.Threading.Tasks
type Promise<'a> = {signal: 'a -> unit; future : Async<'a>}
let create<'a> () =
let tcs = new TaskCompletionSource<'a>()
let ta: Async<'a> = Async.AwaitTask tcs.Task
{signal = tcs.SetResult; future = ta}
let never<'a> () = {create<'a>() with signal = (fun _ -> ())}
module Alt =
open System
open Promise
open System.Threading
type Alt<'a> = Alt of (Async<unit> -> Async<'a>)
let withNack<'a> (f: Async<unit> -> Async<'a>) : Alt<'a> = Alt(f)
let guard (f: Async<'a>) : Alt<'a> = withNack(fun _ -> f)
let never<'a>() = guard <| Promise.never<'a>().future
let always x = guard <| async{return x}
let extract (Alt(a)) = a
let choose (a:Alt<'a>, b:Alt<'a>) =
Alt(fun nack ->
let nack1 = Promise.create()
let nack2 = Promise.create()
let wrkfl1 = async{
let! v = a |> extract <| (AsyncExt.choice [nack1.future;nack])
nack2.signal()
return v
}
let wrkfl2 = async{
let! v = b |> extract <| (AsyncExt.choice [nack2.future;nack])
nack1.signal()
return v
}
AsyncExt.choice [|wrkfl1;wrkfl2|])
let chooseC a b = choose(a,b)
let select (s : seq<Alt<'a>>) = Seq.foldBack chooseC s (never<'a>())
let private rand = new System.Random(DateTime.Now.Millisecond)
let shuffle s = Seq.sortBy(fun _ -> rand.Next()) s
let wrap ((Alt(a)),f) = Alt(fun nack -> async{
let! x = a(nack)
return f(x)})
let run<'a> (a:Alt<'a>) =
a|> extract <| Promise.never<unit>().future
let sync<'a> (a:Alt<'a>) =
let a = a|> extract <| Promise.never<unit>().future
Async.RunSynchronously(a)
open Alt
let test = sync >> printfn "resut is %A"
let time x = async{do! Async.Sleep(x)
printfn "time %d" x}
let add_nack name (Alt(x)) = withNack (fun nack -> async{
async{
do! nack
printfn "nacked %s" name
} |> Async.Start
return! x(nack)
})
[always(1); always(2)] |> select |> test
[always(1); always(2)] |> shuffle |> select |> test
choose (always(1) |> add_nack "always(1)",
always(2) |> add_nack "always(2)") |> test
[always(1) |> add_nack "always(1)";
always(2) |> add_nack "always(2)"] |> select |> test
let test1 ev g f=
let a = sync(wrap(wrap(ev, g), f))
let b = sync(wrap(ev, g>>f))
a = b
let test2 ev1 ev2 f =
let a = sync(wrap(choose(ev1, ev2), f))
let b = sync(choose(wrap(ev1, f), wrap(ev2, f)))
a = b
test1 (always(1)) (fun x -> x + 1) (fun x -> 2 * x) |> printfn "test1 %b"
test2 (always(1)) (never()) (fun x -> x + 1) |> printfn "test2 %b"
//ack server as workaround for no channels
type Server<'a> = Server of (Alt<Server<'a>>)
let unwrap_server (Server e) = e
let rand = new System.Random(DateTime.Now.Millisecond)
let rec server (x:int) (send_resp: int -> Async<unit>) : Server<int> =
Server(
withNack <| fun nack -> async{
let alt = Alt.choose (wrap(guard(nack), fun _ -> x)
,wrap(guard(send_resp(x)), fun _ -> x + 1))
let! x = run alt
return server x send_resp
})
let rec poll_client (rcv_resp: unit -> Async<int>) (server:Server<int>) (server2:Server<int>) (acc: int list) : Async<int list> = async{
if List.length acc > 10 then return acc
else let new_serv = choose(wrap(unwrap_server server, fun new_serv -> new_serv, server2)
,wrap(unwrap_server server2, fun new_serv -> new_serv, server))
let! s1,s2 = run new_serv
let! msg = rcv_resp()
return! poll_client rcv_resp s1 s2 (msg::acc)
}
let queue : int option ref = ref None
let synchronized f = lock queue f
let rec get () = async{
do! Async.Sleep(rand.Next(1000))
let v = synchronized <| fun () ->
let res = queue.Value
if res.IsSome then queue := None
res
if v.IsSome then return v.Value
else return! get()
}
let rec put x = async{
do! Async.Sleep(rand.Next(1000))
let ok = synchronized <| fun () -> if queue.Value.IsNone
then queue := Some(x)
true
else false
if ok then return ()
else return! put x
}
let in_serv = server 0 put
let in_serv2 = server 1000 put
let client = poll_client get in_serv in_serv2 []
Async.RunSynchronously(client) |> printf "result %A"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment