Skip to content

Instantly share code, notes, and snippets.

@andreaskoepf
Last active October 22, 2015 21:24
Show Gist options
  • Save andreaskoepf/e771f66720300ae143c8 to your computer and use it in GitHub Desktop.
Save andreaskoepf/e771f66720300ae143c8 to your computer and use it in GitHub Desktop.
Rx JobQueue
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace Xamla.Utilities
{
public interface IJobQueue
{
IObservable<Notification<Unit>> WhenJobCompletes { get; }
IObservable<Unit> WhenQueueEmpty { get; }
IObservable<Exception> WhenJobFails { get; }
int RunningCount { get; }
int QueuedCount { get; }
IObservable<Unit> Add(Action action);
IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart);
bool StartNext();
int StartUpTo(int maxConcurrentlyRunning);
void CancelOutstandingJobs();
}
public class ManualJobQueue
: IJobQueue
{
protected struct Job
{
public Func<IObservable<Unit>> AsyncStart;
public AsyncSubject<Unit> CompletionHandler;
public BooleanDisposable Cancel;
public SerialDisposable JobSubscription;
}
ConcurrentQueue<Job> queue;
int runningCount;
Subject<Notification<Unit>> whenJobCompletes;
Subject<Unit> whenQueueEmpty;
IObservable<Exception> whenJobFails;
public ManualJobQueue()
{
queue = new ConcurrentQueue<Job>();
whenJobCompletes = new Subject<Notification<Unit>>();
whenQueueEmpty = new Subject<Unit>();
// whenJobFailes subscription
whenJobFails = whenJobCompletes.Where(n => n.Kind == NotificationKind.OnError)
.Select(n => n.Exception)
.Synchronize();
// whenQueueEmpty subscription
whenJobCompletes.Subscribe(n =>
{
int queueCount = queue.Count;
if (Interlocked.Decrement(ref runningCount) == 0 && queueCount == 0)
whenQueueEmpty.OnNext(new Unit());
});
}
#region IJobQueue Implementation
public IObservable<Notification<Unit>> WhenJobCompletes
{
get { return whenJobCompletes.AsObservable(); }
}
public IObservable<Unit> WhenQueueEmpty
{
get { return whenQueueEmpty.AsObservable(); }
}
public IObservable<Exception> WhenJobFails
{
get { return whenJobFails; }
}
public int RunningCount
{
get { return runningCount; }
}
public int QueuedCount
{
get { return queue.Count; }
}
public IObservable<Unit> Add(Action action)
{
return Add(Observable.ToAsync(action));
}
public virtual IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart)
{
Job job = new Job
{
AsyncStart = asyncStart,
CompletionHandler = new AsyncSubject<Unit>(),
Cancel = new BooleanDisposable(),
JobSubscription = new SerialDisposable()
};
var cancelable = Observable.Create<Unit>(o =>
new CompositeDisposable(
job.CompletionHandler.Subscribe(o),
job.JobSubscription,
job.Cancel
)
);
job.CompletionHandler
.Materialize()
.Where(n => n.Kind == NotificationKind.OnCompleted || n.Kind == NotificationKind.OnError)
.Subscribe(whenJobCompletes.OnNext); // pass on errors and completions
queue.Enqueue(job);
return cancelable;
}
public bool StartNext()
{
Job job;
if (TryDequeNextJob(out job))
{
Interlocked.Increment(ref runningCount);
StartJob(job);
return true;
}
return false;
}
public int StartUpTo(int maxConcurrentlyRunning)
{
int started = 0;
for (;;)
{
for (;;)
{
int running = 0;
do // test and increment with compare and swap
{
running = runningCount;
if (running >= maxConcurrentlyRunning)
return started;
} while (Interlocked.CompareExchange(ref runningCount, running + 1, running) != running);
Job job;
if (TryDequeNextJob(out job))
{
StartJob(job);
++started;
}
else
{
// dequeing job failed but we already incremented running count
Interlocked.Decrement(ref runningCount);
// ensure that no other thread queued an item and did not start it
// because the running count was too high
if (queue.Count == 0)
{
// if there is nothing in the queue after the decrement
// we can safely return
return started;
}
}
}
}
}
public void CancelOutstandingJobs()
{
Job job;
while (TryDequeNextJob(out job))
{
job.Cancel.Dispose();
job.CompletionHandler.OnError(new OperationCanceledException());
}
}
#endregion
private bool TryDequeNextJob(out Job job)
{
do
{
if (!queue.TryDequeue(out job))
return false;
} while (job.Cancel.IsDisposed);
return true;
}
private void StartJob(Job job)
{
if (job.Cancel.IsDisposed)
return;
try
{
job.JobSubscription.Disposable =
job.AsyncStart().Subscribe(
u => OnJobCompleted(job, null),
e => OnJobCompleted(job, e)
);
if (job.Cancel.IsDisposed)
job.JobSubscription.Dispose();
}
catch (Exception ex)
{
OnJobCompleted(job, ex);
}
}
protected virtual void OnJobCompleted(Job job, Exception error)
{
if (error == null)
{
job.CompletionHandler.OnNext(new Unit());
job.CompletionHandler.OnCompleted();
}
else
{
job.CompletionHandler.OnError(error);
}
}
}
public class AutoJobQueue
: ManualJobQueue
{
int maxConcurrent;
public AutoJobQueue(int maxConcurrent)
{
if (maxConcurrent < 1)
throw new ArgumentOutOfRangeException("maxConcurrent");
this.maxConcurrent = maxConcurrent;
}
public override IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart)
{
return Add(asyncStart, true);
}
public IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart, bool autoStart)
{
var whenCompleted = base.Add(asyncStart);
if (autoStart)
StartUpTo(maxConcurrent);
return whenCompleted;
}
protected override void OnJobCompleted(Job job, Exception error)
{
base.OnJobCompleted(job, error);
if (error != null)
TaskPoolScheduler.Default.Schedule(() => StartUpTo(maxConcurrent));
else
StartUpTo(maxConcurrent);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment