Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Created February 6, 2015 14:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eulerfx/cd02789548f21cc4b5f5 to your computer and use it in GitHub Desktop.
Save eulerfx/cd02789548f21cc4b5f5 to your computer and use it in GitHub Desktop.
AsyncEvent (an F# async based Rx)
/// An async observer.
type AsyncObserver<'a> = 'a option -> Async<unit>
module AsyncObserver =
let inline post a (obs:AsyncObserver<'a>) = obs (Some a)
let inline stop (obs:AsyncObserver<'a>) = obs None
let contramap (f:'b -> 'a) (o:AsyncObserver<'a>) : AsyncObserver<'b> =
o << Option.map f
let contramapAsync (f:'b -> Async<'a>) (o:AsyncObserver<'a>) : AsyncObserver<'b> =
function
| Some b -> async {
let! a = f b
do! o (Some a) }
| None -> o None
let filterAsync (f:'a -> Async<bool>) (o:AsyncObserver<'a>) : AsyncObserver<'a> =
function
| Some a -> async {
let! p = f a
if p then return! o (Some a)
else return () }
| None -> o None
let skipWhileAsync (f:'a -> Async<bool>) (o:AsyncObserver<'a>) : AsyncObserver<'a> =
let skipped = ref false
function
| Some a -> async {
if !skipped then do! o (Some a)
else
let! skip = f a
if not skip then
skipped := true
do! o (Some a) }
| None -> o None
let contrachooseAsync (f:'b -> Async<'a option>) (o:AsyncObserver<'a>) : AsyncObserver<'b> =
function
| Some b -> async {
let! a = f b
return! o a }
| None -> o None
let ofSync (obs:System.IObserver<'a>) : AsyncObserver<'a> =
function
| Some a -> obs.OnNext a ; async.Return()
| None -> obs.OnCompleted() ; async.Return()
/// Creates a syncrhonous observer which schedules notifcations to a woker queue.
let toSync maxQueueSize workers (obs:AsyncObserver<'a>) : System.IObserver<'a> =
let post,stop = Async.toSyncStop maxQueueSize workers obs
{ new System.IObserver<'a> with
member __.OnCompleted() = post None ; stop()
member __.OnNext a = post (Some a)
member __.OnError _ = post None }
/// Merges two observables into one such that one is invoked sequentially after the other.
let merge (o1:AsyncObserver<'a>) (o2:AsyncObserver<'a>) : AsyncObserver<'a> =
AsyncSink.merge o1 o2
/// An async observable (event).
type AsyncObservable<'a> = AsyncObserver<'a> -> Async<unit>
module AsyncObservable =
let inline create o : AsyncObservable<'a> = o
let inline add (o:AsyncObservable<'a>) (obs:AsyncObserver<'a>) = o obs
let inline subs (f:'a -> Async<unit>) (o:AsyncObservable<'a>) = add o <| function Some a -> f a | None -> async.Return()
let empty<'a> : AsyncObservable<'a> =
create <| fun obs -> obs None
let singleton a : AsyncObservable<'a> =
create <| fun obs -> async {
do! obs <| Some a
do! obs <| None }
let delay (f:unit -> AsyncObservable<'a>) : AsyncObservable<'a> =
create <| fun obs -> f() obs
let interval (ms:int) : AsyncObservable<unit> =
create <| fun (obs:AsyncObserver<unit>) -> async {
while true do
do! Async.Sleep ms
do! obs (Some ())
}
let map (f:'a -> 'b) (o:AsyncObservable<'a>) : AsyncObservable<'b> =
o << AsyncObserver.contramap f
let mapAsync (f:'a -> Async<'b>) (o:AsyncObservable<'a>) : AsyncObservable<'b> =
o << AsyncObserver.contramapAsync f
let join (oo:AsyncObservable<AsyncObservable<'a>>) : AsyncObservable<'a> =
create <| fun (obs:AsyncObserver<'a>) ->
add oo <| function
| None -> obs None
| Some o -> o obs
let bind (f:'a -> AsyncObservable<'b>) (o:AsyncObservable<'a>) : AsyncObservable<'b> =
map f o |> join
let bindAsync (f:'a -> AsyncObservable<'b>) (a:Async<'a>) : AsyncObservable<'b> =
create <| fun (obs:AsyncObserver<'b>) -> Async.bind (fun a -> f a obs) a
let filterAsync (f:'a -> Async<bool>) (o:AsyncObservable<'a>) : AsyncObservable<'a> =
create <| fun (obs:AsyncObserver<'a>) ->
add o <| AsyncObserver.filter f obs
let chooseAsync (f:'a -> Async<'b option>) (o:AsyncObservable<'a>) : AsyncObservable<'b> =
create <| fun (obs:AsyncObserver<'b>) ->
add o <| AsyncObserver.contrachoose f obs
let last (o:AsyncObservable<'a>) : Async<'a option> = async {
// TODO: thread safety
let last : ref<option<'a>> = ref None
use mre = new System.Threading.ManualResetEventSlim()
do! add o <| function
| Some a -> last := Some a ; async.Return()
| None -> mre.Set() ; async.Return()
do! mre.WaitHandle |> Async.AwaitWaitHandle |> Async.Ignore
return !last }
let first (o:AsyncObservable<'a>) : Async<'a> = async {
let first : ref<option<'a>> = ref None
use mre = new System.Threading.ManualResetEventSlim()
do! add o <| function
| Some a ->
if (not mre.IsSet) then
first := Some a
mre.Set()
async.Return()
| None -> failwith ""
do! mre.WaitHandle |> Async.AwaitWaitHandle |> Async.Ignore
return !first |> Option.get }
let scanAsync (f:'b -> 'a -> Async<'b>) (b:'b) (o:AsyncObservable<'a>) : AsyncObservable<'b> =
fun (obs:AsyncObserver<'b>) ->
let b = ref b
add o <| function
| None -> obs None
| Some a -> async {
let! b' = f (!b) a
b := b'
do! obs (Some b') }
let foldAsync (f:'b -> 'a -> Async<'b>) (b:'b) (o:AsyncObservable<'a>) : Async<'b> =
scanAsync f b o |> last |> Async.map (Option.isNull b)
/// Creates an observable which adds observers to the two underlying observables.
let merge (o1:AsyncObservable<'a>) (o2:AsyncObservable<'a>) : AsyncObservable<'a> =
fun (obs:AsyncObserver<'a>) ->
Async.Parallel [ o1 obs ; o2 obs] |> Async.Ignore
let concat (o:AsyncObservable<#seq<'a>>) : AsyncObservable<'a> =
fun (obs:AsyncObserver<'a>) ->
add o <| function
| Some aa -> aa |> Seq.map (Some >> obs) |> Async.withParallelWorkersUnbounded
| None -> obs None
type AsyncBatchAgent<'a>(batchSize, timeout) =
let batchObserver,batchObservable = subjectSeq<'a[]>
let newSegment() =
let array : 'a array = Array.zeroCreate batchSize
new ArraySegment<_>(array, 0, 0)
let segmentToArray (segment:ArraySegment<'a>) =
let array : 'a[] = Array.zeroCreate segment.Count
Array.Copy(segment.Array, array, segment.Count)
array
let expandSegment item (segment:ArraySegment<'a>) =
segment.Array.[segment.Count] <- item
new ArraySegment<_>(segment.Array, segment.Offset, segment.Count + 1)
let body (agent:MailboxProcessor<_>) =
let rec loop remainingTime (messages:ArraySegment<_>) = async {
let start = DateTime.Now
let! msg = agent.TryReceive(timeout = max 0 remainingTime)
let elapsed = int (DateTime.Now - start).TotalMilliseconds
match msg with
| Some msg when messages.Count = batchSize - 1 ->
do! batchObserver |> AsyncObserver.post (messages |> expandSegment msg |> segmentToArray)
return! loop timeout (newSegment())
| Some msg ->
return! loop (remainingTime - elapsed) (messages |> expandSegment msg)
| None when messages.Count <> 0 ->
do! batchObserver |> AsyncObserver.post (messages |> segmentToArray)
return! loop timeout (newSegment())
| None ->
return! loop timeout (newSegment()) }
loop timeout (newSegment())
let agent = MailboxProcessor.Start body
member __.BatchProduced = batchObservable
member __.Enqueue v = agent.Post v
let bufferWithTimeOrCount (timeSpan:System.TimeSpan) (count:int) (o:AsyncObservable<'a>) : AsyncObservable<'a[]> =
fun (obs:AsyncObserver<'a[]>) -> async {
let batch = new AsyncBatchAgent<_>(count, int timeSpan.TotalMilliseconds)
do! batch.BatchProduced obs
do! o <| function
| Some a -> batch.Enqueue a ; async.Return()
| None -> obs None }
/// Creates an observable which wraps observers.
let withObserver (f:AsyncObserver<'a> -> AsyncObserver<'a>) (o:AsyncObservable<'a>) : AsyncObservable<'a> =
fun (obs:AsyncObserver<'a>) ->
add o <| f obs
/// Creates an observable which invokes the specified effect after each observation.
let withObserverPostEffect (effect:AsyncObserver<'a>) (o:AsyncObservable<'a>) : AsyncObservable<'a> =
withObserver ((flip AsyncObserver.merge) effect) o
/// Creates an observable which invokes the specified effect before each observation.
let withObserverPreEffect (effect:AsyncObserver<'a>) (o:AsyncObservable<'a>) : AsyncObservable<'a> =
withObserver (AsyncObserver.merge effect) o
let toSync (o:AsyncObservable<'a>) : System.IObservable<'a> =
Observable.create <| fun obs ->
AsyncObserver.ofSync obs
|> add o
|> Async.RunSynchronously
fun() -> ()
let ofSync (maxBufferSize:int) (workers:int) (o:System.IObservable<'a>) : AsyncObservable<'a> =
AsyncObserver.toSync maxBufferSize workers
>> o.Subscribe
>> ignore
>> async.Return
/// Creates an observable from an async sequence such that each observer will iterate through the sequence using the specified degree of concurrency.
let ofAsyncSeq (concurrency:int) (s:AsyncSeq<'a>) : AsyncObservable<'a> =
fun (obs:AsyncObserver<'a>) -> async {
do! AsyncSeq.iterAsyncParThrottled concurrency (Some >> obs) s
do! obs None }
let ofAsyncSeqUnbounded (s:AsyncSeq<'a>) : AsyncObservable<'a> =
fun (obs:AsyncObserver<'a>) -> async {
do! AsyncSeq.iterAsyncPar (Some >> obs) s
do! obs None }
let toAsyncSeq (maxQueueSize:int) (o:AsyncObservable<'a>) : AsyncSeq<'a> = asyncSeq {
use buffer = new System.Collections.Concurrent.BlockingCollection<_>(maxQueueSize)
do! add o <| function
| Some a ->
buffer.Add a
async.Return()
| None ->
buffer.CompleteAdding()
async.Return()
yield! buffer.GetConsumingEnumerable() |> AsyncSeq.ofSeq }
let unfoldAsync (f:'State -> Async<('a * 'State) option>) (s:'State) : AsyncObservable<'a> =
fun (obs:AsyncObserver<'a>) ->
let rec go s = async {
match s with
| Some (a,s) ->
let! s',_ = Async.Parallel(f s, obs (Some a))
return! go s'
| None ->
do! obs None
return ()
}
f s |> Async.bind go
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment