Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool wasNotAlreadyActive;
Notification<T> outsideNotification;
lock (gate)
{
wasNotAlreadyActive = !active;
active = true;
outsideNotification = thisNotification;
}
if (wasNotAlreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
Notification<T> localNotification;
lock (gate)
{
localNotification = outsideNotification;
outsideNotification = null;
}
localNotification.Accept(observer);
bool hasPendingNotification;
lock (gate)
{
hasPendingNotification = active = (outsideNotification != null);
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment