Skip to content

Instantly share code, notes, and snippets.

@mattbarrett
Created May 5, 2014 18:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattbarrett/05b2df37b57241dfd707 to your computer and use it in GitHub Desktop.
Save mattbarrett/05b2df37b57241dfd707 to your computer and use it in GitHub Desktop.
public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler)
{
return Observable.Create<IStale<T>>(observer =>
{
var timerSubscription = new SerialDisposable();
var observerLock = new object();
Action scheduleStale = () =>
{
timerSubscription.Disposable = Observable
.Timer(stalenessPeriod, scheduler)
.Subscribe(_ =>
{
lock (observerLock)
{
observer.OnNext(new Stale<T>());
}
});
};
var sourceSubscription = source.Subscribe(
x =>
{
// cancel any scheduled stale update
var disposable = timerSubscription.Disposable;
if (disposable != null)
disposable.Dispose();
lock (observerLock)
{
observer.OnNext(new Stale<T>(x));
}
scheduleStale();
},
observer.OnError,
observer.OnCompleted);
scheduleStale();
return new CompositeDisposable { sourceSubscription, timerSubscription };
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment