Skip to content

Instantly share code, notes, and snippets.

Last active October 22, 2015 21:24
Show Gist options
  • Save andreaskoepf/e771f66720300ae143c8 to your computer and use it in GitHub Desktop.
Save andreaskoepf/e771f66720300ae143c8 to your computer and use it in GitHub Desktop.
Rx JobQueue
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace Xamla.Utilities
public interface IJobQueue
IObservable<Notification<Unit>> WhenJobCompletes { get; }
IObservable<Unit> WhenQueueEmpty { get; }
IObservable<Exception> WhenJobFails { get; }
int RunningCount { get; }
int QueuedCount { get; }
IObservable<Unit> Add(Action action);
IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart);
bool StartNext();
int StartUpTo(int maxConcurrentlyRunning);
void CancelOutstandingJobs();
public class ManualJobQueue
: IJobQueue
protected struct Job
public Func<IObservable<Unit>> AsyncStart;
public AsyncSubject<Unit> CompletionHandler;
public BooleanDisposable Cancel;
public SerialDisposable JobSubscription;
ConcurrentQueue<Job> queue;
int runningCount;
Subject<Notification<Unit>> whenJobCompletes;
Subject<Unit> whenQueueEmpty;
IObservable<Exception> whenJobFails;
public ManualJobQueue()
queue = new ConcurrentQueue<Job>();
whenJobCompletes = new Subject<Notification<Unit>>();
whenQueueEmpty = new Subject<Unit>();
// whenJobFailes subscription
whenJobFails = whenJobCompletes.Where(n => n.Kind == NotificationKind.OnError)
.Select(n => n.Exception)
// whenQueueEmpty subscription
whenJobCompletes.Subscribe(n =>
int queueCount = queue.Count;
if (Interlocked.Decrement(ref runningCount) == 0 && queueCount == 0)
whenQueueEmpty.OnNext(new Unit());
#region IJobQueue Implementation
public IObservable<Notification<Unit>> WhenJobCompletes
get { return whenJobCompletes.AsObservable(); }
public IObservable<Unit> WhenQueueEmpty
get { return whenQueueEmpty.AsObservable(); }
public IObservable<Exception> WhenJobFails
get { return whenJobFails; }
public int RunningCount
get { return runningCount; }
public int QueuedCount
get { return queue.Count; }
public IObservable<Unit> Add(Action action)
return Add(Observable.ToAsync(action));
public virtual IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart)
Job job = new Job
AsyncStart = asyncStart,
CompletionHandler = new AsyncSubject<Unit>(),
Cancel = new BooleanDisposable(),
JobSubscription = new SerialDisposable()
var cancelable = Observable.Create<Unit>(o =>
new CompositeDisposable(
.Where(n => n.Kind == NotificationKind.OnCompleted || n.Kind == NotificationKind.OnError)
.Subscribe(whenJobCompletes.OnNext); // pass on errors and completions
return cancelable;
public bool StartNext()
Job job;
if (TryDequeNextJob(out job))
Interlocked.Increment(ref runningCount);
return true;
return false;
public int StartUpTo(int maxConcurrentlyRunning)
int started = 0;
for (;;)
for (;;)
int running = 0;
do // test and increment with compare and swap
running = runningCount;
if (running >= maxConcurrentlyRunning)
return started;
} while (Interlocked.CompareExchange(ref runningCount, running + 1, running) != running);
Job job;
if (TryDequeNextJob(out job))
// dequeing job failed but we already incremented running count
Interlocked.Decrement(ref runningCount);
// ensure that no other thread queued an item and did not start it
// because the running count was too high
if (queue.Count == 0)
// if there is nothing in the queue after the decrement
// we can safely return
return started;
public void CancelOutstandingJobs()
Job job;
while (TryDequeNextJob(out job))
job.CompletionHandler.OnError(new OperationCanceledException());
private bool TryDequeNextJob(out Job job)
if (!queue.TryDequeue(out job))
return false;
} while (job.Cancel.IsDisposed);
return true;
private void StartJob(Job job)
if (job.Cancel.IsDisposed)
job.JobSubscription.Disposable =
u => OnJobCompleted(job, null),
e => OnJobCompleted(job, e)
if (job.Cancel.IsDisposed)
catch (Exception ex)
OnJobCompleted(job, ex);
protected virtual void OnJobCompleted(Job job, Exception error)
if (error == null)
job.CompletionHandler.OnNext(new Unit());
public class AutoJobQueue
: ManualJobQueue
int maxConcurrent;
public AutoJobQueue(int maxConcurrent)
if (maxConcurrent < 1)
throw new ArgumentOutOfRangeException("maxConcurrent");
this.maxConcurrent = maxConcurrent;
public override IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart)
return Add(asyncStart, true);
public IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart, bool autoStart)
var whenCompleted = base.Add(asyncStart);
if (autoStart)
return whenCompleted;
protected override void OnJobCompleted(Job job, Exception error)
base.OnJobCompleted(job, error);
if (error != null)
TaskPoolScheduler.Default.Schedule(() => StartUpTo(maxConcurrent));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment