This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let race (a:Async<'a>) (b:Async<'a>) : Async<'a * Async<'a>> = async { | |
return! | |
Async.FromContinuations <| fun (ok,err,cnc) -> | |
let state = ref 0 | |
let iv = new TaskCompletionSource<_>() | |
let ok a = | |
if (Interlocked.CompareExchange(state, 1, 0) = 0) then | |
ok (a, iv.Task |> Async.AwaitTask) | |
else | |
iv.SetResult a |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let timeoutNone (timeoutMs:int) (a:Async<'a>) : Async<'a option> = async { | |
let! ct = Async.CancellationToken | |
let res = TaskCompletionSource<_>() | |
use cts = CancellationTokenSource.CreateLinkedTokenSource ct | |
res.Task.ContinueWith (fun _ -> cts.Cancel ()) |> ignore | |
use timer = new Timer((fun _ -> res.TrySetResult None |> ignore), null, timeoutMs, Timeout.Infinite) | |
Async.StartThreadPoolWithContinuations ( | |
a, | |
(fun a -> res.TrySetResult (Some a) |> ignore), | |
(fun e -> res.TrySetException e |> ignore), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let cache (a:Async<'a>) : Async<'a> = | |
let tcs = TaskCompletionSource<'a>() | |
let state = ref 0 | |
async { | |
if (Interlocked.CompareExchange(state, 1, 0) = 0) then | |
Async.StartWithContinuations( | |
a, | |
tcs.SetResult, | |
tcs.SetException, | |
(fun _ -> tcs.SetCanceled())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://github.com/fsprojects/FSharp.Control.AsyncSeq | |
// RetryPolicy from https://github.com/jet/kafunk/blob/master/src/kafunk/Utility/Faults.fs#L89 | |
/// Returns an Async computation which evaluates the input computation until the specified condition is met | |
/// with delays between attempts dictated by the specified retry policy. | |
let pollUntil (rp:RetryPolicy) (condition:'a -> bool) (a:Async<'a>) : Async<'a option> = | |
(AsyncSeq.replicateInfiniteAsync a, RetryPolicy.delayStream rp) | |
||> AsyncSeq.interleaveChoice | |
|> AsyncSeq.tryPick (function Choice1Of2 a when condition a -> Some a | _ -> None) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// port of https://github.com/mausch/Fleece | |
type ToJsonClass = ToJsonClass | |
type FromJsonClass = FromJsonClass | |
type ParseResult<'a> = Choice<'a, string> | |
module ParseResult = | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
open System.Threading | |
let private disposable (dispose:unit -> unit) = | |
{ new IDisposable with member __.Dispose () = dispose () } | |
/// Creates an observable which triggers based on intervals specified by the input sequence. | |
let ofDelays (delays:TimeSpan seq) : IObservable<DateTimeOffset> = | |
{ new IObservable<_> with | |
member __.Subscribe obs = |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// An inventory item. | |
[<RequireQualifiedAccess>] | |
module InventoryItem | |
/// Represents the state of an inventory item. | |
type State = { | |
isActive : bool; | |
} | |
with static member Zero = { isActive = false } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// A learner for function type 'a -> 'b. | |
type Learner<'p, 'a, 'b> = { | |
/// An function parameterized by 'p implementing 'a -> 'b. | |
i : 'p * 'a -> 'b | |
/// Updates the parameter 'p based on training pair ('a,'b). | |
u : 'p * 'a * 'b -> 'p | |
/// Requests an input 'a based on parameter 'p and training pair ('a,'b). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type ExecContext () = | |
class | |
// TODO: scheduler, storage | |
end | |
[<AbstractClass>] | |
type Cont<'a> () = | |
abstract RunCont : ExecContext * 'a -> unit | |
[<AbstractClass>] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// A probability is a number between 0 and 1 inclusive. | |
type Prob = float | |
/// A distribution is a sequence of outcome-probability pairs such that the | |
/// probabilities sum to 1. | |
type Dist<'a> = D of seq<'a * Prob> | |
/// A spread takes a sequence of elements and assigns probabilities. | |
type Spread<'a> = 'a list -> Dist<'a> |