Skip to content

Instantly share code, notes, and snippets.

@Dorus
Last active September 15, 2015 11:58
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Dorus/92d9d5336eacdea77d8a to your computer and use it in GitHub Desktop.
Save Dorus/92d9d5336eacdea77d8a to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace Server {
static class OverflowQueue_ {
/// <summary>
/// Collect each element in the source sequence and enqueue it on the resulting
/// sequence. If the callback on the resulting sequence does not complete before
/// new elements are received, enqueue up to maximal N elements, and process them
/// within the TimeSpan, decided by filter.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence</param>
/// <param name="filter">The filter function that decide if an element is still relevant.</param>
/// <param name="scheduler">Scheduler to notify observers on.</param>
/// <returns>The resulting sequence.</returns>
public static IObservable<TSource> OverflowQueue<TSource>(this IObservable<TSource> source, Func<TimeSpan, int, Boolean> filter, IScheduler scheduler) {
return Observable.Create<TSource>(o => {
var gate = new Object();
var queue = new ConcurrentQueue<Timestamped<TSource>>();
var running = false;
var completed = false;
Exception error = null;
return source.Timestamp(scheduler).Do(_ => {
Timestamped<TSource> result;
if (queue.TryPeek(out result)) {
var time = scheduler.Now.Subtract(result.Timestamp);
if (!filter(time, queue.Count)) {
// remove oldest element if needed.
queue.TryDequeue(out result);
}
}
})
.Subscribe(el => {
queue.Enqueue(el);
lock (gate) {
if (running) {
return;
}
running = true;
}
Action<Action> runQueue = (self) => {
Timestamped<TSource> result;
while (queue.TryDequeue(out result)) {
var time = scheduler.Now.Subtract(result.Timestamp);
if (filter(time, queue.Count)) {
o.OnNext(result.Value);
}
}
lock (gate) {
if (queue.Count == 0) {
if (error != null) {
o.OnError(error);
}
if (completed) {
o.OnCompleted();
}
running = false;
return;
}
}
self();
};
scheduler.Schedule(runQueue);
}, err => scheduler.Schedule(() => {
lock (gate) {
if (running) {
error = err;
return;
}
}
o.OnError(err);
}), () => scheduler.Schedule(() => {
lock (gate) {
if (running) {
completed = true;
return;
}
}
o.OnCompleted();
}));
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment