Skip to content

Instantly share code, notes, and snippets.

@Horusiath

Horusiath/Fiber.fs

Last active Mar 31, 2020
Embed
What would you like to do?
Custom fibers implementation in F#
open System
open System.Threading
type FiberResult<'a> = Result<'a, exn> option
[<Sealed;AllowNullLiteral>]
type Cancel(parent: Cancel) =
let mutable flag: int = 0
let mutable children: Cancel list = []
new() = Cancel(null)
/// Check if token was cancelled
member __.Cancelled = flag = 1
/// Remove child token
member private __.RemoveChild(child) =
let rec loop child =
let children' = children
let nval = children' |> List.filter ((<>) child)
if not (obj.ReferenceEquals(children', Interlocked.CompareExchange(&children, nval, children')))
then loop child
if not (List.isEmpty children) then loop child
/// Create a new child token and return it.
member this.AddChild () =
let rec loop child =
let children' = children
if (obj.ReferenceEquals(children', Interlocked.CompareExchange(&children, child::children', children')))
then child
else loop child
loop (Cancel this)
/// Cancel a token
member this.Cancel() =
if Interlocked.Exchange(&flag, 1) = 0 then
for child in Interlocked.Exchange(&children, []) do child.Cancel()
if not (isNull parent) then parent.RemoveChild(this)
[<Interface>]
type IScheduler =
abstract Schedule: (unit -> unit) -> unit
abstract Delay: TimeSpan * (unit -> unit) -> unit
type Fiber<'a> = Fiber of (IScheduler * Cancel -> (FiberResult<'a> -> unit) -> unit)
[<RequireQualifiedAccess>]
module Fiber =
/// Wraps value into fiber.
let success r = Fiber <| fun (_, c) next -> if c.Cancelled then next None else next (Some (Ok r))
/// Wraps exception into fiber.
let fail e = Fiber <| fun (_, c) next -> if c.Cancelled then next None else next (Some (Error e))
/// Returns a cancelled fiber.
let cancelled<'a> = Fiber <| fun _ next -> next None
/// Returns a fiber, which will delay continuation execution after a given timeout.
let delay timeout =
Fiber <| fun (s, c) next ->
if c.Cancelled then next None
else s.Delay(timeout, fun () ->
if c.Cancelled
then next None
else next (Some (Ok ())))
/// Maps result of Fiber execution to another value and returns new Fiber with mapped value.
let mapResult fn (Fiber call) = Fiber <| fun (s, c) next ->
if c.Cancelled then next None
else
try
call (s, c) (fun result ->
if c.Cancelled then next None
else next (Option.map fn result))
with e -> next (Some (Error e))
/// Maps successful result of Fiber execution to another value and returns new Fiber with mapped value.
let map fn fiber = mapResult (Result.map fn) fiber
/// Allows to recover from exception (if `fn` returns Ok) or recast it (if `fn` returns Error).
let catch fn fiber = mapResult (function Error e -> fn e | other -> other) fiber
let bind fn (Fiber call) = Fiber <| fun (s, c) next ->
if c.Cancelled then next None
else
try
call (s, c) (fun result ->
if c.Cancelled then next None
else match result with
| Some (Ok r) ->
let (Fiber call2) = fn r
call2 (s, c) next
| None -> next None
| Some (Error e) -> next (Some(Error e))
)
with e -> next (Some(Error e))
/// Starts both fibers running in parallel, returning the result from the winner
/// (the one which completed first) while cancelling the other.
let race (Fiber left) (Fiber right): Fiber<Choice<'a, 'b>> =
Fiber <| fun (s, c) next ->
if c.Cancelled then next None
else
let mutable flag = 0
let child = c.AddChild()
let run fiber choice =
s.Schedule (fun () ->
fiber (s, child) (fun result ->
if Interlocked.Exchange(&flag, 1) = 0 then
child.Cancel()
if c.Cancelled then next None
else match result with
| None -> next None
| Some(Ok v) -> next (Some(Ok(choice v)))
| Some(Error e) -> next (Some(Error e))))
run left Choice1Of2
run right Choice2Of2
let timeout (t: TimeSpan) fiber =
Fiber <| fun (s, c) next ->
let (Fiber call) = race (delay t) fiber
call (s, c) (fun result ->
if c.Cancelled then next None
else match result with
| None -> next None
| Some(Ok (Choice1Of2 _)) -> next None // timeout won
| Some(Ok (Choice2Of2 v)) -> next (Some(Ok v))
| Some(Error e) -> next (Some(Error e))
)
/// Executes a bunch of Fiber operations in parallel, returning an Fiber which may contain
/// a gathered set of results or (potential) failures that have happened during the execution.
let parallel fibs =
Fiber <| fun (s, c) next ->
if c.Cancelled then next None
else
let mutable remaining = Array.length fibs
let successes = Array.zeroCreate remaining
let childCancel = c.AddChild()
fibs |> Array.iteri (fun i (Fiber call) ->
s.Schedule (fun () ->
call (s, childCancel) (fun result ->
match result with
| Some (Ok success) ->
successes.[i] <- success
if c.Cancelled && Interlocked.Exchange(&remaining, -1) > 0 then
next None
elif Interlocked.Decrement(&remaining) = 0 then
if c.Cancelled then next None
else next (Some (Ok successes))
| Some (Error fail) ->
if Interlocked.Exchange(&remaining, -1) > 0 then
childCancel.Cancel()
if c.Cancelled then next None
else next (Some (Error fail))
| None ->
if Interlocked.Exchange(&remaining, -1) > 0 then
next None))
)
/// Blocks current execution thread, executing given Fiber, and returning result of execution.
let blocking (s: IScheduler) (cancel: Cancel) (Fiber fn) =
use waiter = new ManualResetEventSlim(false)
let mutable res = None
s.Schedule(fun () -> fn (s, cancel) (fun result ->
if not cancel.Cancelled then
Interlocked.Exchange(&res, Some result) |> ignore
waiter.Set()))
waiter.Wait()
res.Value
/// Converts given Fiber into F# Async.
let toAsync s (Fiber call) = Async.FromContinuations <| fun (onSuccess, onError, onCancel) ->
call (s, Cancel()) <| fun result ->
match result with
| None -> onCancel (OperationCanceledException "")
| Some (Ok value) -> onSuccess value
| Some (Error e) -> onError e
[<RequireQualifiedAccess>]
module Scheduler =
/// Default environment, which is backed by .NET Thread pool.
let shared =
{ new IScheduler with
member __.Schedule fn = System.Threading.ThreadPool.QueueUserWorkItem(WaitCallback (ignore>>fn)) |> ignore
member __.Delay (timeout: TimeSpan, fn) =
let mutable t = Unchecked.defaultof<Timer>
let callback = fun _ ->
t.Dispose()
fn()
()
t <- new Timer(callback, null, int timeout.TotalMilliseconds, Timeout.Infinite)
}
type TestScheduler(now: DateTime) =
let mutable running = false
let mutable currentTime = now.Ticks
let mutable timeline = Map.empty
let schedule delay fn =
let at = currentTime + delay
timeline <-
match Map.tryFind at timeline with
| None -> Map.add at [fn] timeline
| Some fns -> Map.add at (fn::fns) timeline
let rec run () =
match Seq.tryHead timeline with
| None -> running <- false
| Some (KeyValue(time, bucket)) ->
timeline <- Map.remove time timeline
currentTime <- time
for fn in List.rev bucket do
fn ()
run ()
member __.UtcNow () = DateTime(currentTime)
interface IScheduler with
member this.Schedule fn =
schedule 0L fn
if not running then
running <- true
run ()
member this.Delay (timeout: TimeSpan, fn) = schedule timeout.Ticks fn
let test(cancel, fiber) =
let s = TestScheduler(DateTime.UtcNow)
Fiber.blocking s cancel fiber
let test(cancel, fiber) =
let s = TestScheduler(DateTime.UtcNow)
Fiber.blocking s cancel fiber
[<Struct>]
type FiberBuilder =
member inline __.Zero = Fiber.success (Unchecked.defaultof<_>)
member inline __.ReturnFrom fib = fib
member inline __.Return value = Fiber.success value
member inline __.Bind(fib, fn) = Fiber.bind fn fib
[<AutoOpen>]
module FiberBuilder =
let fib = FiberBuilder()
//---------------------
// run some actual code
//---------------------
let inline millis n = TimeSpan.FromMilliseconds (float n)
let program = fib {
let a = fib {
do! Fiber.delay (millis 5000)
return 3
}
let! b = a |> Fiber.timeout (millis 3000)
return b }
[<EntryPoint>]
let main argv =
let cancel = Cancel ()
let result = Scheduler.test(cancel, program)
printfn "Result: %A" result
Console.ReadLine ()
0 // return an integer exit code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment