Skip to content

Instantly share code, notes, and snippets.

@mattbarrett
Created May 5, 2014 18:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattbarrett/2ff14b1346842d403574 to your computer and use it in GitHub Desktop.
Save mattbarrett/2ff14b1346842d403574 to your computer and use it in GitHub Desktop.
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool wasNotAlreadyActive;
Notification<T> outsideNotification;
lock (gate)
{
wasNotAlreadyActive = !active;
active = true;
outsideNotification = thisNotification;
}
if (wasNotAlreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
Notification<T> localNotification;
lock (gate)
{
localNotification = outsideNotification;
outsideNotification = null;
}
localNotification.Accept(observer);
bool hasPendingNotification;
lock (gate)
{
hasPendingNotification = active = (outsideNotification != null);
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment