Last active
December 3, 2015 06:39
-
-
Save andybrackley/e2b4e90a0577f7e0453c to your computer and use it in GitHub Desktop.
Subscribing to an Observable sequence and performing an action ignoring all but latest value
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#r @"C:\Users\Andy\Documents\Programming\Projects\F#\ScratchPad\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll" | |
#r @"C:\Users\Andy\Documents\Programming\Projects\F#\ScratchPad\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll" | |
#r @"C:\Users\Andy\Documents\Programming\Projects\F#\ScratchPad\packages\Rx-Linq.2.2.5\lib\net45\System.Reactive.Linq.dll" | |
#r @"C:\Users\Andy\Documents\Programming\Projects\F#\ScratchPad\packages\FSharp.Control.Reactive.3.2.0\lib\net40\FSharp.Control.Reactive.dll" | |
open FSharp.Control.Reactive | |
open System.Reactive.Disposables | |
open System.Reactive.Linq | |
open System.Reactive.Subjects | |
module Observable = | |
// Some inspiration taken from here: | |
// http://stackoverflow.com/questions/11010602/with-rx-how-do-i-ignore-all-except-the-latest-value-when-my-subscribe-method-is | |
let observeOnLatest<'T> (initialVal : 'T) (sourceObserver : System.IObservable<'T>) = | |
Observable.Create<'T>(fun (obs : System.IObserver<'T>) -> | |
let bs = new BehaviorSubject<'T>(initialVal) | |
let dispose = | |
new CompositeDisposable( | |
sourceObserver.Multicast(bs).Connect(), | |
sourceObserver |> Observable.subscribeWithCallbacks (fun _ -> obs.OnNext(bs.Value)) obs.OnError obs.OnCompleted) | |
Action(fun _ -> dispose.Dispose()) | |
) | |
let sampleSubscribe<'T> (initialVal : 'T) action (sourceObserver : System.IObservable<'T>) = | |
let bs = new BehaviorSubject<'T>(initialVal) | |
new CompositeDisposable( | |
sourceObserver.Multicast(bs).Connect(), | |
sourceObserver |> Observable.subscribe ( fun _ -> action bs.Value ) | |
) | |
type System.IObservable<'T> with | |
member x.SampleSubscribe(initialVal, action) = | |
Observable.sampleSubscribe initialVal action x | |
// Using the SampleSubscribe | |
let d = | |
Observable.interval(System.TimeSpan.FromMilliseconds(150.0)) | |
.SampleSubscribe (0L, | |
(fun i -> | |
[0 .. 10] |> Seq.iter (fun _ -> System.Threading.Thread.Sleep(100)) | |
printfn "%d" i)) | |
d.Dispose() | |
// Better version using proper Observable Pattern | |
let action (i : int64) = | |
printfn "%d on thread %d" i System.Threading.Thread.CurrentThread.ManagedThreadId | |
System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(1.1)) | |
let obs = | |
Observable.interval(System.TimeSpan.FromMilliseconds(150.0)) | |
|> Observable.observeOnLatest 0L | |
|> Observable.take 5 | |
|> Observable.subscribeWithCallbacks action (fun err -> printfn "Error: %s" err.Message) (fun() -> printfn "Completed" ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output: