Skip to content

Instantly share code, notes, and snippets.

@ctigeek
Last active January 5, 2017 02:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ctigeek/137ba9c7410d52d647f8 to your computer and use it in GitHub Desktop.
Save ctigeek/137ba9c7410d52d647f8 to your computer and use it in GitHub Desktop.
Run tasks in parallel returned from an IObservable.
internal class TObservable<T> : IObservable<T>, IDisposable
{
private readonly IObservable<Task<T>> taskObservable;
private IDisposable taskObservableSubscriber;
private IObserver<T> observer;
private readonly int maxConcurrency;
private Task<T>[] currentTasks;
private int taskCount = 0;
private int nextIndex = 0;
public TObservable(IObservable<Task<T>> taskObservable, int maxConcurrency)
{
this.taskObservable = taskObservable;
this.maxConcurrency = maxConcurrency;
this.currentTasks = new Task<T>[maxConcurrency];
}
private void CallSubscriber(Task<T> task)
{
if (task.IsFaulted)
{
this.OnError(task.Exception);
}
else if (task.IsCanceled)
{
// you may want to call OnCompleted instead based on how you're using the cancellation token.
this.OnError(new OperationCanceledException("The task was cancelled."));
}
else
{
this.observer.OnNext(task.Result);
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
this.observer = observer;
this.taskObservableSubscriber = taskObservable.Subscribe(OnNext, OnError, OnCompleted);
return this;
}
private void OnNext(Task<T> task)
{
currentTasks[nextIndex] = task;
taskCount++;
if (taskCount == maxConcurrency)
{
nextIndex = Task.WaitAny(currentTasks);
CallSubscriber(currentTasks[nextIndex]);
currentTasks[nextIndex] = null;
taskCount--;
}
else
{
nextIndex++;
}
}
private void OnError(Exception exception)
{
this.Dispose();
this.observer.OnError(exception);
}
private void OnCompleted()
{
while (taskCount > 0)
{
currentTasks = currentTasks.Where(t => t != null).ToArray();
nextIndex = Task.WaitAny(currentTasks);
CallSubscriber(currentTasks[nextIndex]);
currentTasks[nextIndex] = null;
taskCount--;
}
this.Dispose();
this.observer.OnCompleted();
}
public void Dispose()
{
if (taskCount > 0)
{
//cancel running tasks?
}
if (this.taskObservableSubscriber != null)
{
this.taskObservableSubscriber.Dispose();
this.taskObservableSubscriber = null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment