Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler)
{
return Observable.Create<IStale<T>>(observer =>
{
var timerSubscription = new SerialDisposable();
var observerLock = new object();
Action scheduleStale = () =>
{
timerSubscription.Disposable = Observable
.Timer(stalenessPeriod, scheduler)
.Subscribe(_ =>
{
lock (observerLock)
{
observer.OnNext(new Stale<T>());
}
});
};
var sourceSubscription = source.Subscribe(
x =>
{
// cancel any scheduled stale update
var disposable = timerSubscription.Disposable;
if (disposable != null)
disposable.Dispose();
lock (observerLock)
{
observer.OnNext(new Stale<T>(x));
}
scheduleStale();
},
observer.OnError,
observer.OnCompleted);
scheduleStale();
return new CompositeDisposable { sourceSubscription, timerSubscription };
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment