Skip to content

Instantly share code, notes, and snippets.

@andybrackley
Last active December 3, 2015 06:39
Show Gist options
  • Save andybrackley/e2b4e90a0577f7e0453c to your computer and use it in GitHub Desktop.
Save andybrackley/e2b4e90a0577f7e0453c to your computer and use it in GitHub Desktop.
Subscribing to an Observable sequence and performing an action ignoring all but latest value
#r @"C:\Users\Andy\Documents\Programming\Projects\F#\ScratchPad\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll"
#r @"C:\Users\Andy\Documents\Programming\Projects\F#\ScratchPad\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll"
#r @"C:\Users\Andy\Documents\Programming\Projects\F#\ScratchPad\packages\Rx-Linq.2.2.5\lib\net45\System.Reactive.Linq.dll"
#r @"C:\Users\Andy\Documents\Programming\Projects\F#\ScratchPad\packages\FSharp.Control.Reactive.3.2.0\lib\net40\FSharp.Control.Reactive.dll"
open FSharp.Control.Reactive
open System.Reactive.Disposables
open System.Reactive.Linq
open System.Reactive.Subjects
module Observable =
// Some inspiration taken from here:
// http://stackoverflow.com/questions/11010602/with-rx-how-do-i-ignore-all-except-the-latest-value-when-my-subscribe-method-is
let observeOnLatest<'T> (initialVal : 'T) (sourceObserver : System.IObservable<'T>) =
Observable.Create<'T>(fun (obs : System.IObserver<'T>) ->
let bs = new BehaviorSubject<'T>(initialVal)
let dispose =
new CompositeDisposable(
sourceObserver.Multicast(bs).Connect(),
sourceObserver |> Observable.subscribeWithCallbacks (fun _ -> obs.OnNext(bs.Value)) obs.OnError obs.OnCompleted)
Action(fun _ -> dispose.Dispose())
)
let sampleSubscribe<'T> (initialVal : 'T) action (sourceObserver : System.IObservable<'T>) =
let bs = new BehaviorSubject<'T>(initialVal)
new CompositeDisposable(
sourceObserver.Multicast(bs).Connect(),
sourceObserver |> Observable.subscribe ( fun _ -> action bs.Value )
)
type System.IObservable<'T> with
member x.SampleSubscribe(initialVal, action) =
Observable.sampleSubscribe initialVal action x
// Using the SampleSubscribe
let d =
Observable.interval(System.TimeSpan.FromMilliseconds(150.0))
.SampleSubscribe (0L,
(fun i ->
[0 .. 10] |> Seq.iter (fun _ -> System.Threading.Thread.Sleep(100))
printfn "%d" i))
d.Dispose()
// Better version using proper Observable Pattern
let action (i : int64) =
printfn "%d on thread %d" i System.Threading.Thread.CurrentThread.ManagedThreadId
System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(1.1))
let obs =
Observable.interval(System.TimeSpan.FromMilliseconds(150.0))
|> Observable.observeOnLatest 0L
|> Observable.take 5
|> Observable.subscribeWithCallbacks action (fun err -> printfn "Error: %s" err.Message) (fun() -> printfn "Completed" )
@andybrackley
Copy link
Author

Output:

0
7
14
21
28
35
42

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment