Skip to content

Instantly share code, notes, and snippets.

@aniongithub
Last active May 25, 2023 06:28
Show Gist options
  • Save aniongithub/c650636189b68d0b1ece3e020bc51329 to your computer and use it in GitHub Desktop.
Save aniongithub/c650636189b68d0b1ece3e020bc51329 to your computer and use it in GitHub Desktop.
An extension method on Observable to observe the latest value of an Observable. This was based on a solution from http://www.zerobugbuild.com/?p=192, adapted to use non-materialized streams to reduce GC pressure.
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
namespace ObserveLatestOn
{
public static class ObservableExtensions
{
public static IObservable<T> ObserveLatest<T>(
this IObservable<T> source)
{
return ObserveLatest(source, Scheduler.Default);
}
public static IObservable<T> ObserveLatest<T>(
this IObservable<T> source, IScheduler scheduler)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<T>(observer =>
{
T newValue = default(T);
bool validNewValue = false;
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Subscribe(value =>
{
bool wasNotAlreadyActive = false;
lock (gate)
{
wasNotAlreadyActive = !active;
active = true;
newValue = value;
validNewValue = true;
}
if (wasNotAlreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
T currValue = default(T);
lock (gate)
{
currValue = newValue;
newValue = default(T);
validNewValue = false;
}
observer.OnNext(currValue);
bool hasPendingNotification = false;
lock (gate)
{
hasPendingNotification = active = validNewValue;
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}
}
class Program
{
static void Main(string[] args)
{
var random = new Random();
var tickObservable = Observable.Interval(TimeSpan.FromMilliseconds(33))
.Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x));
tickObservable
.ObserveLatest(Scheduler.Default)
.Subscribe(x =>
{
Console.WriteLine("{0} Thread: {1} Observed value: {2}",
DateTime.Now,
Thread.CurrentThread.ManagedThreadId, x);
Thread.Sleep(random.Next(500)); // Simulate long work time
});
tickObservable.Wait();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment