Skip to content

Instantly share code, notes, and snippets.

@mrange
Created April 27, 2019 08:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrange/121e02cd7818c1a415717846a9b4e5a9 to your computer and use it in GitHub Desktop.
Save mrange/121e02cd7818c1a415717846a9b4e5a9 to your computer and use it in GitHub Desktop.
FsCoroutines
module MinimalisticCoroutine =
type [<Struct>] Coroutine<'T> = Co of (('T -> unit) -> unit)
module Coroutine =
open FSharp.Core.Printf
open System
open System.Diagnostics
open System.Threading
open System.Threading.Tasks
module Details =
type ChildState<'T> =
| Initial
| HasReceiver of ('T -> unit)
| HasValue of 'T
| Done
let sw =
let sw = Stopwatch ()
sw.Start ()
sw
open Details
let result v =
Co <| fun r ->
r v
let bind (Co c) f =
Co <|
fun r ->
let cr v =
let (Co d) = f v
d r
c cr
let combine (Co c) (Co d) =
Co <|
fun r ->
let cr _ =
d r
c cr
let apply (Co c) (Co d) =
Co <|
fun r ->
let cr f =
let dr v = r (f v)
d dr
c cr
let map m (Co c) =
Co <| fun r ->
let cr v = r (m v)
c cr
let unfold uf z =
Co <| fun r ->
let ra = ResizeArray 16
let rec cr p =
match p with
| None -> r (ra.ToArray ())
| Some (s, v) ->
ra.Add v
let (Co c) = uf s
c cr
let (Co c) = uf z
c cr
let debug nm (Co c) =
Co <| fun r ->
printfn "DEBUG - %s - INVOKE" nm
let cr v =
printfn "DEBUG - %s - RESULT: %A" nm v
r v
c cr
let debugf fmt = kprintf debug fmt
let time (Co c) =
Co <| fun r ->
let before = sw.ElapsedMilliseconds
let cr v =
let after = sw.ElapsedMilliseconds
r (after - before, v)
c cr
let switchToNewThread =
Co <| fun r ->
let ts () = r ()
let t = Thread (ThreadStart ts)
t.IsBackground <- true
t.Name <- "Coroutine thread"
t.Start ()
let switchToThreadPool =
Co <| fun r ->
let wc _ = r ()
ThreadPool.QueueUserWorkItem (WaitCallback wc) |> ignore
let join (Co c) =
Co <| fun r ->
let cr (Co d) = d r
c cr
let run (Co c) =
Co <| fun r ->
let lck = obj ()
let mutable state = Initial
let cr v =
let inline locked () =
let s, a =
match state with
| Initial -> HasValue v , ValueNone
| HasReceiver r -> Done , ValueSome r
| HasValue _ -> HasValue v , ValueNone
| Done -> Done , ValueNone
state <- s
a
match (lock lck locked) with
| ValueSome r -> r v
| ValueNone -> ()
c <| cr
let cco cr =
let inline locked () =
let s, a =
match state with
| Initial -> HasReceiver cr , ValueNone
| HasReceiver _ -> HasReceiver cr , ValueNone
| HasValue v -> Done , ValueSome v
| Done -> Done , ValueNone
state <- s
a
match (lock lck locked) with
| ValueSome v -> cr v
| ValueNone -> ()
r <| Co cco
let runAll parallism (cos : _ array) =
let result = Array.zeroCreate cos.Length
let chunks = cos |> Array.indexed |> Array.chunkBySize parallism
Co <| fun r ->
let rec oloop i =
if i < chunks.Length then
let chunk = chunks.[i]
let children = chunk |> Array.map (fun (j, c) -> run (combine switchToThreadPool c) |> join |> map (fun v -> j, v))
let mutable remaining = children.Length
let cr (j, v) =
result.[j] <- v
if Interlocked.Decrement &remaining = 0 then oloop (i + 1)
for (Co c) in children do
c cr
else
r result
oloop 0
let ofTask (t : Task<_>) =
Co <| fun r ->
let cw (t :Task<_>) = r t.Result
t.ContinueWith (Action<Task<'T>> cw) |> ignore
let ofUnitTask (t : Task) =
Co <| fun r ->
let cw (t :Task) = r ()
t.ContinueWith (Action<Task> cw) |> ignore
let invoke (Co c) d =
let wc _ = c d
ThreadPool.QueueUserWorkItem (WaitCallback wc) |> ignore
type Builder () =
class
member x.Bind (c, f) : Coroutine<_> = bind c f
member x.Bind (t, f) : Coroutine<_> = bind (ofTask t) f
member x.Bind (t, f) : Coroutine<_> = bind (ofUnitTask t) f
member x.Combine (c, d) : Coroutine<_> = combine c d
member x.Combine (t, d) : Coroutine<_> = combine (ofUnitTask t) d
member x.Return v : Coroutine<_> = result v
member x.ReturnFrom c : Coroutine<_> = c
member x.ReturnFrom t : Coroutine<_> = ofTask t
member x.Zero () : Coroutine<_> = result ()
end
let coroutine = Coroutine.Builder ()
type Coroutine<'T> with
static member (>>=) (c, f) = Coroutine.bind f c
static member (>>.) (c, d) = Coroutine.combine c d
static member (<*>) (c, d) = Coroutine.apply c d
static member (|>>) (c, m) = Coroutine.map m c
open MinimalisticCoroutine
open System
open System.Net
open System.Threading.Tasks
let example1 =
coroutine {
return 1
}
let downloadFromUri uri =
coroutine {
let wc = new WebClient ()
let! txt = wc.DownloadStringTaskAsync (Uri uri)
wc.Dispose ()
return txt
}
let example2 =
coroutine {
let! tm, txt = Coroutine.time <| downloadFromUri "https://www.google.com/"
return tm, txt.Length
}
let measureDownloadFromUri (nm : string) uri =
coroutine {
let! tm, txt = Coroutine.time <| downloadFromUri uri
return nm, tm, txt.Length
}
let example3 =
coroutine {
let! google = measureDownloadFromUri "google" "https://www.google.com/"
let! bing = measureDownloadFromUri "bing" "https://www.bing.com"
return bing, google
}
let example4 =
let uris = Array.init 10 (fun i -> sprintf "https://gist.github.com/mrange?page=%d" (i + 1))
let cos = uris |> Array.map (fun uri -> measureDownloadFromUri "gist" uri |> Coroutine.debugf "Download: %s" (string uri))
cos |> Coroutine.runAll 2 |> Coroutine.time
[<EntryPoint>]
let main argv =
Coroutine.invoke example4 (fun v -> printfn "Result: %A" v)
System.Console.ReadKey () |> ignore
0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment