Created
February 6, 2015 14:55
-
-
Save eulerfx/cd02789548f21cc4b5f5 to your computer and use it in GitHub Desktop.
AsyncEvent (an F# async based Rx)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// An async observer. | |
type AsyncObserver<'a> = 'a option -> Async<unit> | |
module AsyncObserver = | |
let inline post a (obs:AsyncObserver<'a>) = obs (Some a) | |
let inline stop (obs:AsyncObserver<'a>) = obs None | |
let contramap (f:'b -> 'a) (o:AsyncObserver<'a>) : AsyncObserver<'b> = | |
o << Option.map f | |
let contramapAsync (f:'b -> Async<'a>) (o:AsyncObserver<'a>) : AsyncObserver<'b> = | |
function | |
| Some b -> async { | |
let! a = f b | |
do! o (Some a) } | |
| None -> o None | |
let filterAsync (f:'a -> Async<bool>) (o:AsyncObserver<'a>) : AsyncObserver<'a> = | |
function | |
| Some a -> async { | |
let! p = f a | |
if p then return! o (Some a) | |
else return () } | |
| None -> o None | |
let skipWhileAsync (f:'a -> Async<bool>) (o:AsyncObserver<'a>) : AsyncObserver<'a> = | |
let skipped = ref false | |
function | |
| Some a -> async { | |
if !skipped then do! o (Some a) | |
else | |
let! skip = f a | |
if not skip then | |
skipped := true | |
do! o (Some a) } | |
| None -> o None | |
let contrachooseAsync (f:'b -> Async<'a option>) (o:AsyncObserver<'a>) : AsyncObserver<'b> = | |
function | |
| Some b -> async { | |
let! a = f b | |
return! o a } | |
| None -> o None | |
let ofSync (obs:System.IObserver<'a>) : AsyncObserver<'a> = | |
function | |
| Some a -> obs.OnNext a ; async.Return() | |
| None -> obs.OnCompleted() ; async.Return() | |
/// Creates a syncrhonous observer which schedules notifcations to a woker queue. | |
let toSync maxQueueSize workers (obs:AsyncObserver<'a>) : System.IObserver<'a> = | |
let post,stop = Async.toSyncStop maxQueueSize workers obs | |
{ new System.IObserver<'a> with | |
member __.OnCompleted() = post None ; stop() | |
member __.OnNext a = post (Some a) | |
member __.OnError _ = post None } | |
/// Merges two observables into one such that one is invoked sequentially after the other. | |
let merge (o1:AsyncObserver<'a>) (o2:AsyncObserver<'a>) : AsyncObserver<'a> = | |
AsyncSink.merge o1 o2 | |
/// An async observable (event). | |
type AsyncObservable<'a> = AsyncObserver<'a> -> Async<unit> | |
module AsyncObservable = | |
let inline create o : AsyncObservable<'a> = o | |
let inline add (o:AsyncObservable<'a>) (obs:AsyncObserver<'a>) = o obs | |
let inline subs (f:'a -> Async<unit>) (o:AsyncObservable<'a>) = add o <| function Some a -> f a | None -> async.Return() | |
let empty<'a> : AsyncObservable<'a> = | |
create <| fun obs -> obs None | |
let singleton a : AsyncObservable<'a> = | |
create <| fun obs -> async { | |
do! obs <| Some a | |
do! obs <| None } | |
let delay (f:unit -> AsyncObservable<'a>) : AsyncObservable<'a> = | |
create <| fun obs -> f() obs | |
let interval (ms:int) : AsyncObservable<unit> = | |
create <| fun (obs:AsyncObserver<unit>) -> async { | |
while true do | |
do! Async.Sleep ms | |
do! obs (Some ()) | |
} | |
let map (f:'a -> 'b) (o:AsyncObservable<'a>) : AsyncObservable<'b> = | |
o << AsyncObserver.contramap f | |
let mapAsync (f:'a -> Async<'b>) (o:AsyncObservable<'a>) : AsyncObservable<'b> = | |
o << AsyncObserver.contramapAsync f | |
let join (oo:AsyncObservable<AsyncObservable<'a>>) : AsyncObservable<'a> = | |
create <| fun (obs:AsyncObserver<'a>) -> | |
add oo <| function | |
| None -> obs None | |
| Some o -> o obs | |
let bind (f:'a -> AsyncObservable<'b>) (o:AsyncObservable<'a>) : AsyncObservable<'b> = | |
map f o |> join | |
let bindAsync (f:'a -> AsyncObservable<'b>) (a:Async<'a>) : AsyncObservable<'b> = | |
create <| fun (obs:AsyncObserver<'b>) -> Async.bind (fun a -> f a obs) a | |
let filterAsync (f:'a -> Async<bool>) (o:AsyncObservable<'a>) : AsyncObservable<'a> = | |
create <| fun (obs:AsyncObserver<'a>) -> | |
add o <| AsyncObserver.filter f obs | |
let chooseAsync (f:'a -> Async<'b option>) (o:AsyncObservable<'a>) : AsyncObservable<'b> = | |
create <| fun (obs:AsyncObserver<'b>) -> | |
add o <| AsyncObserver.contrachoose f obs | |
let last (o:AsyncObservable<'a>) : Async<'a option> = async { | |
// TODO: thread safety | |
let last : ref<option<'a>> = ref None | |
use mre = new System.Threading.ManualResetEventSlim() | |
do! add o <| function | |
| Some a -> last := Some a ; async.Return() | |
| None -> mre.Set() ; async.Return() | |
do! mre.WaitHandle |> Async.AwaitWaitHandle |> Async.Ignore | |
return !last } | |
let first (o:AsyncObservable<'a>) : Async<'a> = async { | |
let first : ref<option<'a>> = ref None | |
use mre = new System.Threading.ManualResetEventSlim() | |
do! add o <| function | |
| Some a -> | |
if (not mre.IsSet) then | |
first := Some a | |
mre.Set() | |
async.Return() | |
| None -> failwith "" | |
do! mre.WaitHandle |> Async.AwaitWaitHandle |> Async.Ignore | |
return !first |> Option.get } | |
let scanAsync (f:'b -> 'a -> Async<'b>) (b:'b) (o:AsyncObservable<'a>) : AsyncObservable<'b> = | |
fun (obs:AsyncObserver<'b>) -> | |
let b = ref b | |
add o <| function | |
| None -> obs None | |
| Some a -> async { | |
let! b' = f (!b) a | |
b := b' | |
do! obs (Some b') } | |
let foldAsync (f:'b -> 'a -> Async<'b>) (b:'b) (o:AsyncObservable<'a>) : Async<'b> = | |
scanAsync f b o |> last |> Async.map (Option.isNull b) | |
/// Creates an observable which adds observers to the two underlying observables. | |
let merge (o1:AsyncObservable<'a>) (o2:AsyncObservable<'a>) : AsyncObservable<'a> = | |
fun (obs:AsyncObserver<'a>) -> | |
Async.Parallel [ o1 obs ; o2 obs] |> Async.Ignore | |
let concat (o:AsyncObservable<#seq<'a>>) : AsyncObservable<'a> = | |
fun (obs:AsyncObserver<'a>) -> | |
add o <| function | |
| Some aa -> aa |> Seq.map (Some >> obs) |> Async.withParallelWorkersUnbounded | |
| None -> obs None | |
type AsyncBatchAgent<'a>(batchSize, timeout) = | |
let batchObserver,batchObservable = subjectSeq<'a[]> | |
let newSegment() = | |
let array : 'a array = Array.zeroCreate batchSize | |
new ArraySegment<_>(array, 0, 0) | |
let segmentToArray (segment:ArraySegment<'a>) = | |
let array : 'a[] = Array.zeroCreate segment.Count | |
Array.Copy(segment.Array, array, segment.Count) | |
array | |
let expandSegment item (segment:ArraySegment<'a>) = | |
segment.Array.[segment.Count] <- item | |
new ArraySegment<_>(segment.Array, segment.Offset, segment.Count + 1) | |
let body (agent:MailboxProcessor<_>) = | |
let rec loop remainingTime (messages:ArraySegment<_>) = async { | |
let start = DateTime.Now | |
let! msg = agent.TryReceive(timeout = max 0 remainingTime) | |
let elapsed = int (DateTime.Now - start).TotalMilliseconds | |
match msg with | |
| Some msg when messages.Count = batchSize - 1 -> | |
do! batchObserver |> AsyncObserver.post (messages |> expandSegment msg |> segmentToArray) | |
return! loop timeout (newSegment()) | |
| Some msg -> | |
return! loop (remainingTime - elapsed) (messages |> expandSegment msg) | |
| None when messages.Count <> 0 -> | |
do! batchObserver |> AsyncObserver.post (messages |> segmentToArray) | |
return! loop timeout (newSegment()) | |
| None -> | |
return! loop timeout (newSegment()) } | |
loop timeout (newSegment()) | |
let agent = MailboxProcessor.Start body | |
member __.BatchProduced = batchObservable | |
member __.Enqueue v = agent.Post v | |
let bufferWithTimeOrCount (timeSpan:System.TimeSpan) (count:int) (o:AsyncObservable<'a>) : AsyncObservable<'a[]> = | |
fun (obs:AsyncObserver<'a[]>) -> async { | |
let batch = new AsyncBatchAgent<_>(count, int timeSpan.TotalMilliseconds) | |
do! batch.BatchProduced obs | |
do! o <| function | |
| Some a -> batch.Enqueue a ; async.Return() | |
| None -> obs None } | |
/// Creates an observable which wraps observers. | |
let withObserver (f:AsyncObserver<'a> -> AsyncObserver<'a>) (o:AsyncObservable<'a>) : AsyncObservable<'a> = | |
fun (obs:AsyncObserver<'a>) -> | |
add o <| f obs | |
/// Creates an observable which invokes the specified effect after each observation. | |
let withObserverPostEffect (effect:AsyncObserver<'a>) (o:AsyncObservable<'a>) : AsyncObservable<'a> = | |
withObserver ((flip AsyncObserver.merge) effect) o | |
/// Creates an observable which invokes the specified effect before each observation. | |
let withObserverPreEffect (effect:AsyncObserver<'a>) (o:AsyncObservable<'a>) : AsyncObservable<'a> = | |
withObserver (AsyncObserver.merge effect) o | |
let toSync (o:AsyncObservable<'a>) : System.IObservable<'a> = | |
Observable.create <| fun obs -> | |
AsyncObserver.ofSync obs | |
|> add o | |
|> Async.RunSynchronously | |
fun() -> () | |
let ofSync (maxBufferSize:int) (workers:int) (o:System.IObservable<'a>) : AsyncObservable<'a> = | |
AsyncObserver.toSync maxBufferSize workers | |
>> o.Subscribe | |
>> ignore | |
>> async.Return | |
/// Creates an observable from an async sequence such that each observer will iterate through the sequence using the specified degree of concurrency. | |
let ofAsyncSeq (concurrency:int) (s:AsyncSeq<'a>) : AsyncObservable<'a> = | |
fun (obs:AsyncObserver<'a>) -> async { | |
do! AsyncSeq.iterAsyncParThrottled concurrency (Some >> obs) s | |
do! obs None } | |
let ofAsyncSeqUnbounded (s:AsyncSeq<'a>) : AsyncObservable<'a> = | |
fun (obs:AsyncObserver<'a>) -> async { | |
do! AsyncSeq.iterAsyncPar (Some >> obs) s | |
do! obs None } | |
let toAsyncSeq (maxQueueSize:int) (o:AsyncObservable<'a>) : AsyncSeq<'a> = asyncSeq { | |
use buffer = new System.Collections.Concurrent.BlockingCollection<_>(maxQueueSize) | |
do! add o <| function | |
| Some a -> | |
buffer.Add a | |
async.Return() | |
| None -> | |
buffer.CompleteAdding() | |
async.Return() | |
yield! buffer.GetConsumingEnumerable() |> AsyncSeq.ofSeq } | |
let unfoldAsync (f:'State -> Async<('a * 'State) option>) (s:'State) : AsyncObservable<'a> = | |
fun (obs:AsyncObserver<'a>) -> | |
let rec go s = async { | |
match s with | |
| Some (a,s) -> | |
let! s',_ = Async.Parallel(f s, obs (Some a)) | |
return! go s' | |
| None -> | |
do! obs None | |
return () | |
} | |
f s |> Async.bind go |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment