Last active
October 22, 2015 21:24
-
-
Save andreaskoepf/e771f66720300ae143c8 to your computer and use it in GitHub Desktop.
Rx JobQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Threading; | |
namespace Xamla.Utilities | |
{ | |
public interface IJobQueue | |
{ | |
IObservable<Notification<Unit>> WhenJobCompletes { get; } | |
IObservable<Unit> WhenQueueEmpty { get; } | |
IObservable<Exception> WhenJobFails { get; } | |
int RunningCount { get; } | |
int QueuedCount { get; } | |
IObservable<Unit> Add(Action action); | |
IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart); | |
bool StartNext(); | |
int StartUpTo(int maxConcurrentlyRunning); | |
void CancelOutstandingJobs(); | |
} | |
public class ManualJobQueue | |
: IJobQueue | |
{ | |
protected struct Job | |
{ | |
public Func<IObservable<Unit>> AsyncStart; | |
public AsyncSubject<Unit> CompletionHandler; | |
public BooleanDisposable Cancel; | |
public SerialDisposable JobSubscription; | |
} | |
ConcurrentQueue<Job> queue; | |
int runningCount; | |
Subject<Notification<Unit>> whenJobCompletes; | |
Subject<Unit> whenQueueEmpty; | |
IObservable<Exception> whenJobFails; | |
public ManualJobQueue() | |
{ | |
queue = new ConcurrentQueue<Job>(); | |
whenJobCompletes = new Subject<Notification<Unit>>(); | |
whenQueueEmpty = new Subject<Unit>(); | |
// whenJobFailes subscription | |
whenJobFails = whenJobCompletes.Where(n => n.Kind == NotificationKind.OnError) | |
.Select(n => n.Exception) | |
.Synchronize(); | |
// whenQueueEmpty subscription | |
whenJobCompletes.Subscribe(n => | |
{ | |
int queueCount = queue.Count; | |
if (Interlocked.Decrement(ref runningCount) == 0 && queueCount == 0) | |
whenQueueEmpty.OnNext(new Unit()); | |
}); | |
} | |
#region IJobQueue Implementation | |
public IObservable<Notification<Unit>> WhenJobCompletes | |
{ | |
get { return whenJobCompletes.AsObservable(); } | |
} | |
public IObservable<Unit> WhenQueueEmpty | |
{ | |
get { return whenQueueEmpty.AsObservable(); } | |
} | |
public IObservable<Exception> WhenJobFails | |
{ | |
get { return whenJobFails; } | |
} | |
public int RunningCount | |
{ | |
get { return runningCount; } | |
} | |
public int QueuedCount | |
{ | |
get { return queue.Count; } | |
} | |
public IObservable<Unit> Add(Action action) | |
{ | |
return Add(Observable.ToAsync(action)); | |
} | |
public virtual IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart) | |
{ | |
Job job = new Job | |
{ | |
AsyncStart = asyncStart, | |
CompletionHandler = new AsyncSubject<Unit>(), | |
Cancel = new BooleanDisposable(), | |
JobSubscription = new SerialDisposable() | |
}; | |
var cancelable = Observable.Create<Unit>(o => | |
new CompositeDisposable( | |
job.CompletionHandler.Subscribe(o), | |
job.JobSubscription, | |
job.Cancel | |
) | |
); | |
job.CompletionHandler | |
.Materialize() | |
.Where(n => n.Kind == NotificationKind.OnCompleted || n.Kind == NotificationKind.OnError) | |
.Subscribe(whenJobCompletes.OnNext); // pass on errors and completions | |
queue.Enqueue(job); | |
return cancelable; | |
} | |
public bool StartNext() | |
{ | |
Job job; | |
if (TryDequeNextJob(out job)) | |
{ | |
Interlocked.Increment(ref runningCount); | |
StartJob(job); | |
return true; | |
} | |
return false; | |
} | |
public int StartUpTo(int maxConcurrentlyRunning) | |
{ | |
int started = 0; | |
for (;;) | |
{ | |
for (;;) | |
{ | |
int running = 0; | |
do // test and increment with compare and swap | |
{ | |
running = runningCount; | |
if (running >= maxConcurrentlyRunning) | |
return started; | |
} while (Interlocked.CompareExchange(ref runningCount, running + 1, running) != running); | |
Job job; | |
if (TryDequeNextJob(out job)) | |
{ | |
StartJob(job); | |
++started; | |
} | |
else | |
{ | |
// dequeing job failed but we already incremented running count | |
Interlocked.Decrement(ref runningCount); | |
// ensure that no other thread queued an item and did not start it | |
// because the running count was too high | |
if (queue.Count == 0) | |
{ | |
// if there is nothing in the queue after the decrement | |
// we can safely return | |
return started; | |
} | |
} | |
} | |
} | |
} | |
public void CancelOutstandingJobs() | |
{ | |
Job job; | |
while (TryDequeNextJob(out job)) | |
{ | |
job.Cancel.Dispose(); | |
job.CompletionHandler.OnError(new OperationCanceledException()); | |
} | |
} | |
#endregion | |
private bool TryDequeNextJob(out Job job) | |
{ | |
do | |
{ | |
if (!queue.TryDequeue(out job)) | |
return false; | |
} while (job.Cancel.IsDisposed); | |
return true; | |
} | |
private void StartJob(Job job) | |
{ | |
if (job.Cancel.IsDisposed) | |
return; | |
try | |
{ | |
job.JobSubscription.Disposable = | |
job.AsyncStart().Subscribe( | |
u => OnJobCompleted(job, null), | |
e => OnJobCompleted(job, e) | |
); | |
if (job.Cancel.IsDisposed) | |
job.JobSubscription.Dispose(); | |
} | |
catch (Exception ex) | |
{ | |
OnJobCompleted(job, ex); | |
} | |
} | |
protected virtual void OnJobCompleted(Job job, Exception error) | |
{ | |
if (error == null) | |
{ | |
job.CompletionHandler.OnNext(new Unit()); | |
job.CompletionHandler.OnCompleted(); | |
} | |
else | |
{ | |
job.CompletionHandler.OnError(error); | |
} | |
} | |
} | |
public class AutoJobQueue | |
: ManualJobQueue | |
{ | |
int maxConcurrent; | |
public AutoJobQueue(int maxConcurrent) | |
{ | |
if (maxConcurrent < 1) | |
throw new ArgumentOutOfRangeException("maxConcurrent"); | |
this.maxConcurrent = maxConcurrent; | |
} | |
public override IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart) | |
{ | |
return Add(asyncStart, true); | |
} | |
public IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart, bool autoStart) | |
{ | |
var whenCompleted = base.Add(asyncStart); | |
if (autoStart) | |
StartUpTo(maxConcurrent); | |
return whenCompleted; | |
} | |
protected override void OnJobCompleted(Job job, Exception error) | |
{ | |
base.OnJobCompleted(job, error); | |
if (error != null) | |
TaskPoolScheduler.Default.Schedule(() => StartUpTo(maxConcurrent)); | |
else | |
StartUpTo(maxConcurrent); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment