Last active
May 25, 2023 06:28
-
-
Save aniongithub/c650636189b68d0b1ece3e020bc51329 to your computer and use it in GitHub Desktop.
An extension method on Observable to observe the latest value of an Observable. This was based on a solution from http://www.zerobugbuild.com/?p=192, adapted to use non-materialized streams to reduce GC pressure.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Threading; | |
namespace ObserveLatestOn | |
{ | |
public static class ObservableExtensions | |
{ | |
public static IObservable<T> ObserveLatest<T>( | |
this IObservable<T> source) | |
{ | |
return ObserveLatest(source, Scheduler.Default); | |
} | |
public static IObservable<T> ObserveLatest<T>( | |
this IObservable<T> source, IScheduler scheduler) | |
{ | |
scheduler = scheduler ?? Scheduler.Default; | |
return Observable.Create<T>(observer => | |
{ | |
T newValue = default(T); | |
bool validNewValue = false; | |
var gate = new object(); | |
bool active = false; | |
var cancelable = new MultipleAssignmentDisposable(); | |
var disposable = source.Subscribe(value => | |
{ | |
bool wasNotAlreadyActive = false; | |
lock (gate) | |
{ | |
wasNotAlreadyActive = !active; | |
active = true; | |
newValue = value; | |
validNewValue = true; | |
} | |
if (wasNotAlreadyActive) | |
{ | |
cancelable.Disposable = scheduler.Schedule(self => | |
{ | |
T currValue = default(T); | |
lock (gate) | |
{ | |
currValue = newValue; | |
newValue = default(T); | |
validNewValue = false; | |
} | |
observer.OnNext(currValue); | |
bool hasPendingNotification = false; | |
lock (gate) | |
{ | |
hasPendingNotification = active = validNewValue; | |
} | |
if (hasPendingNotification) | |
{ | |
self(); | |
} | |
}); | |
} | |
}); | |
return new CompositeDisposable(disposable, cancelable); | |
}); | |
} | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var random = new Random(); | |
var tickObservable = Observable.Interval(TimeSpan.FromMilliseconds(33)) | |
.Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}", | |
DateTime.Now, | |
Thread.CurrentThread.ManagedThreadId, x)); | |
tickObservable | |
.ObserveLatest(Scheduler.Default) | |
.Subscribe(x => | |
{ | |
Console.WriteLine("{0} Thread: {1} Observed value: {2}", | |
DateTime.Now, | |
Thread.CurrentThread.ManagedThreadId, x); | |
Thread.Sleep(random.Next(500)); // Simulate long work time | |
}); | |
tickObservable.Wait(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment