Last active
December 26, 2015 17:09
-
-
Save gluck/7185093 to your computer and use it in GitHub Desktop.
ObserveLatestOn version
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Requirements: | |
/// - downstream observer should be called on the provided scheduler | |
/// - downstream observer OnNext shouldn't be called more than once per delay provided | |
/// - downstream observer OnNext should be scheduled at most after the given delay | |
/// - when called, downstream observer OnNext should be given the latest upstream value observed | |
/// - when upstream OnComplete, downstream should be provided the latest value observed before calling OnComplete (ASAP) | |
/// - when upstream OnError, downstream should OnError ASAP | |
/// </summary> | |
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler) | |
{ | |
return Observable.Create<T>(observer => | |
{ | |
return new X<T>(scheduler, delay, observer).SubscribeFrom(source); | |
}); | |
} | |
private class X<T> : IObserver<T> | |
{ | |
private readonly IObserver<T> observer; | |
private readonly TimeSpan delay; | |
private readonly IScheduler scheduler; | |
private readonly object gate = new object(); | |
private readonly SerialDisposable cancelable = new SerialDisposable(); | |
private bool hasEvent = false; | |
private T evt; | |
public X(IScheduler scheduler, TimeSpan delay, IObserver<T> observer) | |
{ | |
this.scheduler = scheduler; | |
this.delay = delay; | |
this.observer = observer; | |
} | |
public IDisposable SubscribeFrom(IObservable<T> source) | |
{ | |
var disposable = source.Subscribe(this); | |
return new CompositeDisposable(disposable, cancelable); | |
} | |
public void OnCompleted() | |
{ | |
cancelable.Disposable = scheduler.Schedule(() => | |
{ | |
T localEvt; | |
bool localHasEvt = false; | |
lock (gate) | |
{ | |
localEvt = evt; | |
localHasEvt = hasEvent; | |
hasEvent = false; | |
} | |
if (localHasEvt) | |
{ | |
observer.OnNext(localEvt); | |
} | |
observer.OnCompleted(); | |
}); | |
} | |
public void OnError(Exception error) | |
{ | |
lock (gate) | |
{ | |
hasEvent = false; | |
} | |
cancelable.Disposable = scheduler.Schedule(() => | |
{ | |
observer.OnError(error); | |
}); | |
} | |
public void OnNext(T obj) | |
{ | |
bool wasNotAlreadyActive; | |
lock (gate) | |
{ | |
wasNotAlreadyActive = !hasEvent; | |
hasEvent = true; | |
evt = obj; | |
} | |
if (wasNotAlreadyActive) | |
{ | |
cancelable.Disposable = scheduler.Schedule(delay, () => | |
{ | |
T localEvt; | |
bool localHasEvt = false; | |
lock (gate) | |
{ | |
localEvt = evt; | |
localHasEvt = hasEvent; | |
hasEvent = false; | |
} | |
if (localHasEvt) | |
{ | |
observer.OnNext(localEvt); | |
} | |
}); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Test] | |
public void Test_Delayed_Notif_Get_Latest_Data() | |
{ | |
var sched = new TestScheduler(); | |
var obs = Observable.Interval(TimeSpan.FromMilliseconds(10), sched); | |
var pub = obs.ObserveLatestOn(TimeSpan.FromMilliseconds(100), sched).Publish(); | |
pub.Connect(); | |
List<Notification<long>> ret = new List<Notification<long>>(); | |
pub.Materialize().Subscribe(ret.Add); | |
Assert.AreEqual(0, ret.Count()); | |
sched.AdvanceToMs(11); // first item is produced, inner tasks is scheduled | |
Assert.AreEqual(0, ret.Count()); | |
sched.AdvanceToMs(111); | |
// delayed observer is notified with latest data | |
Assert.AreEqual(1, ret.Count()); | |
Assert.AreEqual(9, ret[0].Value); | |
} | |
[Test] | |
public void Test_With_Two_Schedulers() | |
{ | |
var sched = new TestScheduler(); | |
var schedUI = new TestScheduler(); | |
var obs = Observable.Interval(TimeSpan.FromMilliseconds(10), sched); | |
var pub = obs.ObserveLatestOn(TimeSpan.FromMilliseconds(100), schedUI).Publish(); | |
pub.Connect(); | |
List<Notification<long>> ret = new List<Notification<long>>(); | |
pub.Materialize().Subscribe(ret.Add); | |
Assert.AreEqual(0, ret.Count()); | |
sched.AdvanceToMs(110); | |
schedUI.AdvanceToMs(110); | |
Assert.AreEqual(1, ret.Count()); | |
sched.AdvanceToMs(4000); | |
// UI Thread is busy ... then suddenly | |
schedUI.AdvanceToMs(4000); | |
Assert.AreEqual(2, ret.Count()); | |
Assert.AreEqual(399, ret[1].Value); | |
} | |
[Test] | |
public void Test_Error_Gets_Propagated_ASAP() | |
{ | |
var sched = new TestScheduler(); | |
var obs = Observable.Interval(TimeSpan.FromMilliseconds(10), sched).Take(15).Concat(Observable.Throw<long>(new Exception())); | |
var pub = obs.ObserveLatestOn(TimeSpan.FromMilliseconds(100), sched).Publish(); | |
pub.Connect(); | |
List<Notification<long>> ret = new List<Notification<long>>(); | |
pub.Materialize().Subscribe(ret.Add); | |
sched.AdvanceToMs(111); | |
Assert.AreEqual(1, ret.Count()); | |
Assert.AreEqual(9, ret[0].Value); | |
sched.AdvanceToMs(149); | |
Assert.AreEqual(1, ret.Count()); | |
sched.AdvanceToMs(151); | |
Assert.AreEqual(2, ret.Count()); | |
Assert.AreEqual(NotificationKind.OnError, ret[1].Kind); | |
} | |
[Test] | |
public void Test_Complete_With_Extra_Data() | |
{ | |
var sched = new TestScheduler(); | |
var obs = Observable.Interval(TimeSpan.FromMilliseconds(10), sched).Take(15); | |
var pub = obs.ObserveLatestOn(TimeSpan.FromMilliseconds(100), sched).Publish(); | |
pub.Connect(); | |
List<Notification<long>> ret = new List<Notification<long>>(); | |
pub.Materialize().Subscribe(ret.Add); | |
Assert.AreEqual(0, ret.Count()); | |
sched.AdvanceToMs(11); // first item is produced, inner tasks is scheduled | |
Assert.AreEqual(0, ret.Count()); | |
sched.AdvanceToMs(111); | |
// delayed observer is notified, either with 9/10 | |
Assert.AreEqual(1, ret.Count()); | |
Assert.AreEqual(9, ret[0].Value); | |
sched.AdvanceToMs(149); | |
Assert.AreEqual(1, ret.Count()); | |
sched.AdvanceToMs(151); | |
Assert.AreEqual(3, ret.Count()); | |
Assert.AreEqual(14, ret[1].Value); | |
Assert.AreEqual(NotificationKind.OnCompleted, ret[2].Kind); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment