Skip to content

Instantly share code, notes, and snippets.

@gluck
Last active December 26, 2015 17:09
Show Gist options
  • Save gluck/7185093 to your computer and use it in GitHub Desktop.
Save gluck/7185093 to your computer and use it in GitHub Desktop.
ObserveLatestOn version
/// <summary>
/// Requirements:
/// - downstream observer should be called on the provided scheduler
/// - downstream observer OnNext shouldn't be called more than once per delay provided
/// - downstream observer OnNext should be scheduled at most after the given delay
/// - when called, downstream observer OnNext should be given the latest upstream value observed
/// - when upstream OnComplete, downstream should be provided the latest value observed before calling OnComplete (ASAP)
/// - when upstream OnError, downstream should OnError ASAP
/// </summary>
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
return new X<T>(scheduler, delay, observer).SubscribeFrom(source);
});
}
private class X<T> : IObserver<T>
{
private readonly IObserver<T> observer;
private readonly TimeSpan delay;
private readonly IScheduler scheduler;
private readonly object gate = new object();
private readonly SerialDisposable cancelable = new SerialDisposable();
private bool hasEvent = false;
private T evt;
public X(IScheduler scheduler, TimeSpan delay, IObserver<T> observer)
{
this.scheduler = scheduler;
this.delay = delay;
this.observer = observer;
}
public IDisposable SubscribeFrom(IObservable<T> source)
{
var disposable = source.Subscribe(this);
return new CompositeDisposable(disposable, cancelable);
}
public void OnCompleted()
{
cancelable.Disposable = scheduler.Schedule(() =>
{
T localEvt;
bool localHasEvt = false;
lock (gate)
{
localEvt = evt;
localHasEvt = hasEvent;
hasEvent = false;
}
if (localHasEvt)
{
observer.OnNext(localEvt);
}
observer.OnCompleted();
});
}
public void OnError(Exception error)
{
lock (gate)
{
hasEvent = false;
}
cancelable.Disposable = scheduler.Schedule(() =>
{
observer.OnError(error);
});
}
public void OnNext(T obj)
{
bool wasNotAlreadyActive;
lock (gate)
{
wasNotAlreadyActive = !hasEvent;
hasEvent = true;
evt = obj;
}
if (wasNotAlreadyActive)
{
cancelable.Disposable = scheduler.Schedule(delay, () =>
{
T localEvt;
bool localHasEvt = false;
lock (gate)
{
localEvt = evt;
localHasEvt = hasEvent;
hasEvent = false;
}
if (localHasEvt)
{
observer.OnNext(localEvt);
}
});
}
}
}
[Test]
public void Test_Delayed_Notif_Get_Latest_Data()
{
var sched = new TestScheduler();
var obs = Observable.Interval(TimeSpan.FromMilliseconds(10), sched);
var pub = obs.ObserveLatestOn(TimeSpan.FromMilliseconds(100), sched).Publish();
pub.Connect();
List<Notification<long>> ret = new List<Notification<long>>();
pub.Materialize().Subscribe(ret.Add);
Assert.AreEqual(0, ret.Count());
sched.AdvanceToMs(11); // first item is produced, inner tasks is scheduled
Assert.AreEqual(0, ret.Count());
sched.AdvanceToMs(111);
// delayed observer is notified with latest data
Assert.AreEqual(1, ret.Count());
Assert.AreEqual(9, ret[0].Value);
}
[Test]
public void Test_With_Two_Schedulers()
{
var sched = new TestScheduler();
var schedUI = new TestScheduler();
var obs = Observable.Interval(TimeSpan.FromMilliseconds(10), sched);
var pub = obs.ObserveLatestOn(TimeSpan.FromMilliseconds(100), schedUI).Publish();
pub.Connect();
List<Notification<long>> ret = new List<Notification<long>>();
pub.Materialize().Subscribe(ret.Add);
Assert.AreEqual(0, ret.Count());
sched.AdvanceToMs(110);
schedUI.AdvanceToMs(110);
Assert.AreEqual(1, ret.Count());
sched.AdvanceToMs(4000);
// UI Thread is busy ... then suddenly
schedUI.AdvanceToMs(4000);
Assert.AreEqual(2, ret.Count());
Assert.AreEqual(399, ret[1].Value);
}
[Test]
public void Test_Error_Gets_Propagated_ASAP()
{
var sched = new TestScheduler();
var obs = Observable.Interval(TimeSpan.FromMilliseconds(10), sched).Take(15).Concat(Observable.Throw<long>(new Exception()));
var pub = obs.ObserveLatestOn(TimeSpan.FromMilliseconds(100), sched).Publish();
pub.Connect();
List<Notification<long>> ret = new List<Notification<long>>();
pub.Materialize().Subscribe(ret.Add);
sched.AdvanceToMs(111);
Assert.AreEqual(1, ret.Count());
Assert.AreEqual(9, ret[0].Value);
sched.AdvanceToMs(149);
Assert.AreEqual(1, ret.Count());
sched.AdvanceToMs(151);
Assert.AreEqual(2, ret.Count());
Assert.AreEqual(NotificationKind.OnError, ret[1].Kind);
}
[Test]
public void Test_Complete_With_Extra_Data()
{
var sched = new TestScheduler();
var obs = Observable.Interval(TimeSpan.FromMilliseconds(10), sched).Take(15);
var pub = obs.ObserveLatestOn(TimeSpan.FromMilliseconds(100), sched).Publish();
pub.Connect();
List<Notification<long>> ret = new List<Notification<long>>();
pub.Materialize().Subscribe(ret.Add);
Assert.AreEqual(0, ret.Count());
sched.AdvanceToMs(11); // first item is produced, inner tasks is scheduled
Assert.AreEqual(0, ret.Count());
sched.AdvanceToMs(111);
// delayed observer is notified, either with 9/10
Assert.AreEqual(1, ret.Count());
Assert.AreEqual(9, ret[0].Value);
sched.AdvanceToMs(149);
Assert.AreEqual(1, ret.Count());
sched.AdvanceToMs(151);
Assert.AreEqual(3, ret.Count());
Assert.AreEqual(14, ret[1].Value);
Assert.AreEqual(NotificationKind.OnCompleted, ret[2].Kind);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment