Instantly share code, notes, and snippets.

View Async.Race.fs
let race (a:Async<'a>) (b:Async<'a>) : Async<'a * Async<'a>> = async {
return!
Async.FromContinuations <| fun (ok,err,cnc) ->
let state = ref 0
let iv = new TaskCompletionSource<_>()
let ok a =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
ok (a, iv.Task |> Async.AwaitTask)
else
iv.SetResult a
View Async.Cache.fs
let cache (a:Async<'a>) : Async<'a> =
let tcs = TaskCompletionSource<'a>()
let state = ref 0
async {
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
Async.StartWithContinuations(
a,
tcs.SetResult,
tcs.SetException,
(fun _ -> tcs.SetCanceled()))
View Async.Timeout.fs
let timeoutNone (timeoutMs:int) (a:Async<'a>) : Async<'a option> = async {
let! ct = Async.CancellationToken
let res = TaskCompletionSource<_>()
use cts = CancellationTokenSource.CreateLinkedTokenSource ct
res.Task.ContinueWith (fun _ -> cts.Cancel ()) |> ignore
use timer = new Timer((fun _ -> res.TrySetResult None |> ignore), null, timeoutMs, Timeout.Infinite)
Async.StartThreadPoolWithContinuations (
a,
(fun a -> res.TrySetResult (Some a) |> ignore),
(fun e -> res.TrySetException e |> ignore),
View Async.WithCancellation.fs
let withCancellation (ct:CancellationToken) (a:Async<'a>) : Async<'a> = async {
let! ct2 = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource (ct, ct2)
let tcs = new TaskCompletionSource<'a>()
use _reg = cts.Token.Register (fun () -> tcs.TrySetCanceled() |> ignore)
let a = async {
try
let! a = a
tcs.TrySetResult a |> ignore
with ex ->
View EventStore.fs
// https://github.com/fsprojects/FSharp.Control.AsyncSeq
// RetryPolicy from https://github.com/jet/kafunk/blob/master/src/kafunk/Utility/Faults.fs#L89
/// Returns an Async computation which evaluates the input computation until the specified condition is met
/// with delays between attempts dictated by the specified retry policy.
let pollUntil (rp:RetryPolicy) (condition:'a -> bool) (a:Async<'a>) : Async<'a option> =
(AsyncSeq.replicateInfiniteAsync a, RetryPolicy.delayStream rp)
||> AsyncSeq.interleaveChoice
|> AsyncSeq.tryPick (function Choice1Of2 a when condition a -> Some a | _ -> None)
View poisson.fs
open System
open System.Threading
let private disposable (dispose:unit -> unit) =
{ new IDisposable with member __.Dispose () = dispose () }
/// Creates an observable which triggers based on intervals specified by the input sequence.
let ofDelays (delays:TimeSpan seq) : IObservable<DateTimeOffset> =
{ new IObservable<_> with
member __.Subscribe obs =
View Async.ParallelThrottledIgnore.fs
let ParallelThrottledIgnore (startOnCallingThread:bool) (parallelism:int) (xs:seq<Async<_>>) = async {
let! ct = Async.CancellationToken
let sm = new SemaphoreSlim(parallelism)
let count = ref 1
let res = TaskCompletionSource<_>()
let tryWait () =
try sm.Wait () ; true
with _ -> false
let tryComplete () =
if Interlocked.Decrement count = 0 then
View learner.fs
/// A learner for function type 'a -> 'b.
type Learner<'p, 'a, 'b> = {
/// An function parameterized by 'p implementing 'a -> 'b.
i : 'p * 'a -> 'b
/// Updates the parameter 'p based on training pair ('a,'b).
u : 'p * 'a * 'b -> 'p
/// Requests an input 'a based on parameter 'p and training pair ('a,'b).
View cell.fs
type ExecContext () =
class
// TODO: scheduler, storage
end
[<AbstractClass>]
type Cont<'a> () =
abstract RunCont : ExecContext * 'a -> unit
[<AbstractClass>]
View universal_construction.fs
open Marvel
open System.Collections.Concurrent
open System.Collections.Generic
type Pid = int
type Op = {
op : string
sn : SN
}