Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aaronmu/b015b71a83292f870934dcbd6529f330 to your computer and use it in GitHub Desktop.
Save aaronmu/b015b71a83292f870934dcbd6529f330 to your computer and use it in GitHub Desktop.
IO
namespace Fsion
open System.Threading
[<Struct;NoEquality;NoComparison>]
type Cancel =
private
| Cancel of bool ref * children: Cancel list ref
module internal Cancel =
let isSet (_:'r,Cancel(i,_)) = !i
let create() = Cancel(ref false, ref [])
let add (r:'r,Cancel(_,c)) =
let i = create()
c := i::!c
r,i
let rec set (r,Cancel(me,kids)) =
me := true
List.iter (fun i -> set(r,i)) !kids
[<NoEquality;NoComparison>]
type UIO<'r,'a> =
| UIO of ('r * Cancel -> ('a option -> unit) -> unit)
member m.Bind(f:'a->UIO<'r,'b>) : UIO<'r,'b> =
let (UIO run) = m
UIO (fun env cont ->
if Cancel.isSet env then cont None
else
run env (fun o ->
if Cancel.isSet env then cont None
else
match Option.map f o with
| None -> cont None
| Some(UIO run) ->
if Cancel.isSet env then cont None
else run env cont
)
)
module UIO =
let result a : UIO<'r,'a> =
UIO (fun env cont ->
if Cancel.isSet env then cont None
else cont (Some a)
)
let effect (f:'r->'a) : UIO<'r,'a> =
UIO (fun env cont ->
if Cancel.isSet env then cont None
else
let a = fst env |> f
if Cancel.isSet env then cont None
else Some a |> cont
)
let map (f:'a->'b) (UIO run) : UIO<'r,'b> =
UIO (fun env cont ->
if Cancel.isSet env then cont None
else run env (fun o ->
if Cancel.isSet env then cont None
else
match Option.map f o with
| None -> cont None
| Some b ->
if Cancel.isSet env then cont None
else cont (Some b)
)
)
let delay milliseconds : UIO<'r,unit> =
UIO (fun env cont ->
if Cancel.isSet env then cont None
else
let mutable t = Unchecked.defaultof<_>
t <- new Timer((fun _ ->
t.Dispose()
if Cancel.isSet env then cont None
else cont (Some())
), null, milliseconds, Timeout.Infinite)
)
let flatten f : UIO<'r,'a> =
UIO (fun env cont ->
if Cancel.isSet env then cont None
else
let (UIO run) = fst env |> f
if Cancel.isSet env then cont None
else run env (fun t ->
if Cancel.isSet env then cont None
else cont t
)
)
let toAsync (env:'r) (UIO run) : Async<'a> =
Async.FromContinuations(fun (cont,_,_) ->
run (env,Cancel.create()) (fun o ->
cont o.Value
)
)
let fork (UIO run) : UIO<'r,UIO<'r,'a>> =
UIO (fun env contFork ->
if Cancel.isSet env then contFork None
else
let mutable o = null
threadpool (fun _ ->
run env (fun a ->
let o = Interlocked.CompareExchange(&o, a, null)
if isNull o |> not then
let cont = o :?> 'a Option->unit
if Cancel.isSet env then cont None
else cont a
)
)
UIO (fun env cont ->
let o = Interlocked.CompareExchange(&o, cont, null)
if Cancel.isSet env then cont None
elif isNull o |> not then
if Cancel.isSet env then cont None
else cont (o :?> 'a option)
)
|> Some
|> contFork
)
let toAsyncCancel (env:'r,cancel) (UIO run) : Async<'a option> =
Async.FromContinuations(fun (cont,_,_) ->
run (env,cancel) (fun o ->
cont o
)
)
let throttle (n:int) (ios:UIO<'r,'a>[]) : UIO<'r,'a>[] =
let mutable threads = 0
let mutable count = 0
let mutable index = -1
let runCont = Array.zeroCreate ios.Length
Array.map (fun (UIO run) ->
UIO (fun env cont ->
runCont.[Interlocked.Increment(&count)-1] <- Some(run,cont)
let rec processLoop missing =
let missing =
if Interlocked.Increment(&threads) <= n then
let rec loop missing =
let i = Interlocked.Increment(&index)
if i >= count then
Interlocked.Decrement(&index) |> ignore
List.choose (fun i ->
match runCont.[i] with
| Some(run,cont) ->
if Cancel.isSet env then cont None
else run env cont
None
| None -> Some i
) missing
else
let missing =
match runCont.[i] with
| Some(run,cont) ->
if Cancel.isSet env then cont None
else run env cont
missing
| None -> i::missing
loop missing
loop missing
else missing
let t = Interlocked.Decrement(&threads)
if List.isEmpty missing |> not || (t = 0 && index+1 < count) then
processLoop missing
processLoop []
)
) ios
let para (ios:UIO<'r,'a>[]) : UIO<'r,'a[]> =
UIO (fun env cont ->
if Cancel.isSet env then cont None
elif Array.isEmpty ios then Array.empty |> Some |> cont
else
let envChild = Cancel.add env
let results = Array.zeroCreate ios.Length
let mutable count = ios.Length
Array.iteri (fun i (UIO run) ->
threadpool (fun _ ->
run envChild (fun a ->
match a with
| Some a ->
results.[i] <- a
if Cancel.isSet env && Interlocked.Exchange(&count,-1) > 0 then
cont None
elif Interlocked.Decrement(&count) = 0 then
if Cancel.isSet env then cont None
else results |> Some |> cont
| None ->
if Interlocked.Exchange(&count,-1) > 0 then
cont None
)
)
) ios
)
let paraSum (ios:UIO<'r,int>[]) : UIO<'r,int> =
UIO (fun env cont ->
if Cancel.isSet env then cont None
elif Array.isEmpty ios then Some 0 |> cont
else
let envChild = Cancel.add env
let mutable result = 0
let mutable count = ios.Length
Array.iteri (fun i (UIO run) ->
threadpool (fun _ ->
run envChild (fun a ->
match a with
| Some a ->
Interlocked.Add(&result, a) |> ignore
if Cancel.isSet env && Interlocked.Exchange(&count,-1) > 0 then
cont None
elif Interlocked.Decrement(&count) = 0 then
if Cancel.isSet env then cont None
else Some result |> cont
| None ->
if Interlocked.Exchange(&count,-1) > 0 then
cont None
)
)
) ios
)
type ClockService =
abstract member Time : unit -> UIO<'r,Time>
abstract member Sleep : int -> UIO<'r,unit>
type Clock =
abstract member Clock : ClockService
module Clock =
let time() =
UIO.flatten (fun (c:#Clock) ->
c.Clock.Time()
)
let sleep milliseconds =
UIO.flatten (fun (c:#Clock) ->
c.Clock.Sleep milliseconds
)
let liveService =
{ new ClockService with
member __.Time() = Time.now() |> UIO.result
member __.Sleep milliseconds = UIO.delay milliseconds
}
[<Struct;NoEquality;NoComparison>]
type Decision<'a,'b> =
| Decision of cont:bool * delay:int * state:'a * (unit -> 'b)
[<Struct;NoEquality;NoComparison>]
type Schedule<'r,'s,'a,'b> =
private
| Schedule of initial:UIO<'r,'s> * update:('a * 's -> UIO<'r,Decision<'s,'b>>)
module Schedule =
let forever<'r,'a> : Schedule<'r,int,'a,int> =
Schedule (UIO.result 0, fun (_,s) -> UIO.result (Decision(true,0,s+1,(fun () -> s+1))))
let private updated (update:('a * 's -> UIO<'r,Decision<'s,'b>>) -> 'a * 's -> UIO<'r,Decision<'s,'b2>>) (Schedule(i,u)) =
Schedule (i,update u)
let private check (test:'a * 'b -> UIO<'r,bool>) m =
updated (fun update (a,s) ->
update(a,s).Bind(fun (Decision(cont,dur,a1,fb) as d) ->
if cont then test(a,fb()) |> UIO.map (fun b -> Decision(b,dur,a1,fb))
else UIO.result d
)
) m
let whileOutput (f:'b->bool) m = check (fun (_,b) -> UIO.result (f b)) m
let recurs n = whileOutput (fun i -> i <= n) forever
[<NoEquality;NoComparison>]
type IO<'r,'a,'e> =
| IO of ('r * Cancel -> (Result<'a,'e> option -> unit) -> unit)
member m.Bind(f:'a->UIO<'r,'b>) : IO<'r,'b,'e> =
let (IO run) = m
IO (fun env cont ->
if Cancel.isSet env then cont None
else
run env (fun o ->
if Cancel.isSet env then cont None
else
match o with
| None -> cont None
| Some(Ok a) ->
let (UIO run) = f a
if Cancel.isSet env then cont None
else run env (fun o ->
if Cancel.isSet env then cont None
else Option.map Ok o |> cont
)
| Some(Error e) -> cont (Some(Error e))
)
)
member m.Bind(f:'a->IO<'r,'b,'e>) : IO<'r,'b,'e> =
let (IO run) = m
IO (fun env cont ->
if Cancel.isSet env then cont None
else
run env (fun o ->
if Cancel.isSet env then cont None
else
match o with
| None -> cont None
| Some(Ok a) ->
let (IO run) = f a
if Cancel.isSet env then cont None
else run env cont
| Some(Error e) -> cont (Some(Error e))
)
)
type UIO<'r,'a> with
member m.Bind(f:'a->IO<'r,'b,'e>) : IO<'r,'b,'e> =
let (UIO run) = m
IO (fun env cont ->
if Cancel.isSet env then cont None
else
run env (fun o ->
if Cancel.isSet env then cont None
else
match Option.map f o with
| None -> cont None
| Some(IO bind) ->
if Cancel.isSet env then cont None
else bind env cont
)
)
module IO =
let ok a : IO<'r,'a,'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else cont (Some (Ok a))
)
let error e : IO<'r,'a,'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else cont (Some (Error e))
)
let result a : IO<'r,'a,'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else cont (Some a)
)
let effect f : IO<'r,'a,'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else
let a = fst env |> f
if Cancel.isSet env then cont None
else cont (Some a)
)
let map (f:'a->'b) (IO run) : IO<'r,'b,'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else run env (fun o ->
if Cancel.isSet env then cont None
else
let b = Option.map (Result.map f) o
if Cancel.isSet env then cont None
else cont b
)
)
let mapError (f:'e->'e2) (IO run) : IO<'r,'a,'e2> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else run env (fun o ->
if Cancel.isSet env then cont None
else
let b = Option.map (Result.mapError f) o
if Cancel.isSet env then cont None
else cont b
)
)
let fromUIO (UIO run:UIO<'r,Result<'a,'e>>) : IO<'r,'a,'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else run env (fun o -> cont o)
)
let mapResult (f:Result<'a,'e>->Result<'b,'e2>) (IO run) : IO<'r,'b,'e2> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else run env (fun o ->
if Cancel.isSet env then cont None
else
let b = Option.map f o
if Cancel.isSet env then cont None
else cont b
)
)
let inline private foldM (succ:'a->IO<'r,'b,'e2>)
(err:'e->IO<'r,'b,'e2>)
(IO run) : IO<'r,'b,'e2> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else
run env (fun o ->
if Cancel.isSet env then cont None
else
match o with
| None -> cont None
| Some(Ok a) ->
let (IO run) = succ a
if Cancel.isSet env then cont None
else run env cont
| Some(Error e) ->
let (IO run) = err e
if Cancel.isSet env then cont None
else run env cont
)
)
let private retryOrElseEither (Schedule(initial, update))
(orElse:'e *'s->IO<'r,'b,'e2>)
(io:IO<'r,'a,'e>) : IO<'r,Choice<'a,'b>,'e2> =
let rec loop (state:'s) : IO<'r,Choice<'a,'b>,'e2> =
foldM
(Choice1Of2 >> Ok >> result)
(fun e ->
let u = update(e,state)
u.Bind (fun (Decision(cont,delay,state,_)) ->
if cont then
if delay = 0 then loop state
else Clock.sleep(delay).Bind(fun _ -> loop state)
else
orElse(e,state)
|> map Choice2Of2
)
)
io
initial.Bind loop
let retry (policy:Schedule<'r,'s,'e,'sb>) (io:IO<'r,'a,'e>) : IO<'r,'a,'e> =
retryOrElseEither policy (fst >> Error >> result) io
|> map Choice.merge
let fork (IO run) : UIO<'r,IO<'r,'a,'e>> =
UIO (fun env contFork ->
if Cancel.isSet env then contFork None
else
let mutable o = None
threadpool (fun _ ->
run env (fun a ->
match Interlocked.CompareExchange(&o, Some(Choice1Of2 a), None) with
| Some(Choice2Of2 cont) ->
if Cancel.isSet env then cont None
else cont a
| _ -> ()
)
)
IO (fun env cont ->
match Interlocked.CompareExchange(&o, Some(Choice2Of2 cont), None) with
| Some(Choice1Of2 a) ->
if Cancel.isSet env then cont None
else cont a
| _ -> ()
)
|> Some |> contFork
)
let para (ios:IO<'r,'a,'e>[]) : IO<'r,'a[],'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
elif Array.isEmpty ios then Ok Array.empty |> Some |> cont
else
let envChild = Cancel.add env
let results = Array.zeroCreate ios.Length
let mutable count = ios.Length
Array.iteri (fun i (IO run) ->
threadpool (fun _ ->
run envChild (fun a ->
match a with
| Some(Ok a) ->
results.[i] <- a
if Cancel.isSet env && Interlocked.Exchange(&count,-1) > 0 then
cont None
elif Interlocked.Decrement(&count) = 0 then
if Cancel.isSet env then cont None
else Ok results |> Some |> cont
| Some(Error e) ->
if Interlocked.Exchange(&count,-1) > 0 then
Cancel.set envChild
if Cancel.isSet env then cont None
else Error e |> Some |> cont
| None ->
if Interlocked.Exchange(&count,-1) > 0 then
cont None
)
)
) ios
)
let paraSum (ios:IO<'r,int,'e>[]) : IO<'r,int,'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
elif Array.isEmpty ios then Ok 0 |> Some |> cont
else
let envChild = Cancel.add env
let mutable result = 0
let mutable count = ios.Length
Array.iter (fun (IO run) ->
threadpool (fun _ ->
run envChild (fun a ->
match a with
| Some(Ok a) ->
Interlocked.Add(&result, a) |> ignore
if Cancel.isSet env && Interlocked.Exchange(&count,-1) > 0 then
cont None
elif Interlocked.Decrement(&count) = 0 then
if Cancel.isSet env then cont None
else Ok result |> Some |> cont
| Some(Error e) ->
if Interlocked.Exchange(&count,-1) > 0 then
Cancel.set envChild
if Cancel.isSet env then cont None
else Error e |> Some |> cont
| None ->
if Interlocked.Exchange(&count,-1) > 0 then
cont None
)
)
) ios
)
let throttle (n:int) (ios:IO<'r,'a,'e>[]) : IO<'r,'a,'e>[] =
let mutable threads = 0
let mutable indexMax = -1
let mutable index = -1
let runCont = Array.zeroCreate ios.Length
Array.map (fun (IO run) ->
IO (fun env cont ->
runCont.[Interlocked.Increment(&indexMax)] <- Some(run,cont)
if Interlocked.Increment(&threads) <= n then
let rec loop missing =
let missing =
List.choose (fun i ->
match runCont.[i] with
| Some(run,cont) ->
if Cancel.isSet env then cont None
else run env cont
runCont.[i] <- None
None
| None -> Some i
) missing
let i = Interlocked.Increment(&index)
let missing =
if i <= indexMax then
match runCont.[i] with
| Some(run,cont) ->
if Cancel.isSet env then cont None
else run env cont
runCont.[i] <- None
missing
| None -> i::missing
else
Interlocked.Decrement(&index) |> ignore
missing
if missing <> [] || index < indexMax then
loop missing
loop []
Interlocked.Decrement(&threads) |> ignore
)
) ios
let paraFold (folder:'state->'a->'state) (state:'state) (ios:IO<'r,'a,'e>[]) : IO<'r,'state,'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
elif Array.isEmpty ios then Ok state |> Some |> cont
else
let mutable state = state
let mutable count = ios.Length
let mutable index = 0
let envChild = Cancel.add env
let results = Array.zeroCreate ios.Length
Array.iteri (fun i (IO run) ->
threadpool (fun _ ->
run envChild (fun a ->
match a with
| Some(Ok a) ->
let rec foldSomeMore i =
if i = results.Length then index <- i
elif Cancel.isSet env |> not then
match results.[i] with
| None ->
index <- i
| Some v ->
results.[i] <- None
state <- folder state v
foldSomeMore (i+1)
if i = index && Cancel.isSet env |> not then
state <- folder state a
foldSomeMore (i+1)
else
results.[i] <- Some a
if Interlocked.Decrement(&count) = 0 then
foldSomeMore index
if Cancel.isSet env then cont None
else Ok state |> Some |> cont
| Some(Error e) ->
if Interlocked.Exchange(&count,-1) >= 0 then
if Cancel.isSet env then cont None
else Error e |> Some |> cont
Cancel.set envChild
| None ->
if Interlocked.Exchange(&count,-1) >= 0 then
cont None
)
)
) ios
)
let fold (folder:'state->'a->'state) (state:'state) (ios:IO<'r,'a,'e>[]) : IO<'r,'state,'e> =
IO (fun env cont ->
if Cancel.isSet env then cont None
elif Array.isEmpty ios then Ok state |> Some |> cont
else
let mutable state = state
let mutable count = ios.Length
let rec loop i =
if i = ios.Length then
if Cancel.isSet env then cont None
else Ok state |> Some |> cont
else
let (IO run) = ios.[i]
run env (fun a ->
match a with
| Some(Ok a) ->
if count >= 0 && Cancel.isSet env |> not then
state <- folder state a
loop (i+1)
| Some(Error e) ->
if Interlocked.Exchange(&count,-1) >= 0 then
if Cancel.isSet env then cont None
else Error e |> Some |> cont
| None ->
if Interlocked.Exchange(&count,-1) >= 0 then
cont None
)
loop 0
)
let race (UIO run1) (IO run2) : IO<'r,Choice<'a1,'a2>,'e1> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else
let envChild = Cancel.add env
let mutable o = 0
threadpool (fun _ ->
run1 envChild (fun a ->
if Interlocked.Exchange(&o,1) = 0 then
Cancel.set envChild
if Cancel.isSet env then cont None
else Option.map (Choice2Of2 >> Ok) a |> cont
)
)
threadpool (fun _ ->
run2 envChild (fun a ->
if Interlocked.Exchange(&o,1) = 0 then
Cancel.set envChild
if Cancel.isSet env then cont None
else Option.map (Result.map Choice1Of2) a |> cont
)
)
)
let timeout (milliseconds:int) (io:IO<'r,'a,'e>) : IO<'r,'a,'e option> =
IO (fun env cont ->
let (IO run) = race (Clock.sleep milliseconds) io
run env (fun o ->
if Cancel.isSet env then cont None
else
match o with
| None -> None
| Some(Ok (Choice1Of2 a)) -> Ok a |> Some
| Some(Ok (Choice2Of2 ())) -> Error None |> Some
| Some(Error e) -> Error (Some e) |> Some
|> cont
)
)
let toAsync (env:'r) (IO run) : Async<Result<'a,'e>> =
Async.FromContinuations(fun (cont,_,_) ->
run (env,Cancel.create()) (fun o ->
cont o.Value
)
)
let toAsyncCancel (env:'r,cancel) (IO run) : Async<Result<'a,'e> option> =
Async.FromContinuations(fun (cont,_,_) ->
run (env,cancel) (fun o ->
cont o
)
)
type IOBuilder() =
member inline __.Bind(io:UIO<'r,'a>, f:'a->UIO<'r,'b>) : UIO<'r,'b> = io.Bind f
member inline __.Bind(io:IO<'r,'a,'e>, f:'a->UIO<'r,'b>) : IO<'r,'b,'e> = io.Bind f
member inline __.Bind(io:UIO<'r,'a>, f:'a->IO<'r,'b,'e>) : IO<'r,'b,'e> = io.Bind f
member inline __.Bind<'r,'a,'b,'e>(io:IO<'r,'a,'e>, f:'a->IO<'r,'b,'e>) = io.Bind<'b> f
member inline __.Return value = UIO.result value
member inline __.ReturnFrom value = value
member inline __.Zero() = UIO.result Unchecked.defaultof<_>
[<AutoOpen>]
module IOAutoOpen =
let io = IOBuilder()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment