Create a gist now

Instantly share code, notes, and snippets.

public static IObservable<T> Conflate<T>(this IObservable<T> source, TimeSpan minimumUpdatePeriod, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
// indicate when the last update was published
var lastUpdateTime = DateTimeOffset.MinValue;
// indicate if an update is currently scheduled
var updateScheduled = new MultipleAssignmentDisposable();
// indicate if completion has been requested (we can't complete immediatly if an update is in flight)
var completionRequested = false;
var gate = new object();
var subscription = source
.ObserveOn(scheduler)
.Subscribe(
x =>
{
var currentUpdateTime = scheduler.Now;
bool scheduleRequired;
lock (gate)
{
scheduleRequired = currentUpdateTime - lastUpdateTime < minimumUpdatePeriod;
if (scheduleRequired && updateScheduled.Disposable != null)
{
updateScheduled.Disposable.Dispose();
updateScheduled.Disposable = null;
}
}
if (scheduleRequired)
{
updateScheduled.Disposable = scheduler.Schedule(lastUpdateTime + minimumUpdatePeriod, () =>
{
observer.OnNext(x);
lock (gate)
{
lastUpdateTime = scheduler.Now;
updateScheduled.Disposable = null;
if (completionRequested)
{
observer.OnCompleted();
}
}
});
}
else
{
observer.OnNext(x);
lock (gate)
{
lastUpdateTime = scheduler.Now;
}
}
},
observer.OnError,
() =>
{
// if we have scheduled an update we need to complete once the update has been published
if (updateScheduled.Disposable != null)
{
lock (gate)
{
completionRequested = true;
}
}
else
{
observer.OnCompleted();
}
});
return subscription;
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment