Last active
January 5, 2017 02:15
-
-
Save ctigeek/137ba9c7410d52d647f8 to your computer and use it in GitHub Desktop.
Run tasks in parallel returned from an IObservable.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
internal class TObservable<T> : IObservable<T>, IDisposable | |
{ | |
private readonly IObservable<Task<T>> taskObservable; | |
private IDisposable taskObservableSubscriber; | |
private IObserver<T> observer; | |
private readonly int maxConcurrency; | |
private Task<T>[] currentTasks; | |
private int taskCount = 0; | |
private int nextIndex = 0; | |
public TObservable(IObservable<Task<T>> taskObservable, int maxConcurrency) | |
{ | |
this.taskObservable = taskObservable; | |
this.maxConcurrency = maxConcurrency; | |
this.currentTasks = new Task<T>[maxConcurrency]; | |
} | |
private void CallSubscriber(Task<T> task) | |
{ | |
if (task.IsFaulted) | |
{ | |
this.OnError(task.Exception); | |
} | |
else if (task.IsCanceled) | |
{ | |
// you may want to call OnCompleted instead based on how you're using the cancellation token. | |
this.OnError(new OperationCanceledException("The task was cancelled.")); | |
} | |
else | |
{ | |
this.observer.OnNext(task.Result); | |
} | |
} | |
public IDisposable Subscribe(IObserver<T> observer) | |
{ | |
this.observer = observer; | |
this.taskObservableSubscriber = taskObservable.Subscribe(OnNext, OnError, OnCompleted); | |
return this; | |
} | |
private void OnNext(Task<T> task) | |
{ | |
currentTasks[nextIndex] = task; | |
taskCount++; | |
if (taskCount == maxConcurrency) | |
{ | |
nextIndex = Task.WaitAny(currentTasks); | |
CallSubscriber(currentTasks[nextIndex]); | |
currentTasks[nextIndex] = null; | |
taskCount--; | |
} | |
else | |
{ | |
nextIndex++; | |
} | |
} | |
private void OnError(Exception exception) | |
{ | |
this.Dispose(); | |
this.observer.OnError(exception); | |
} | |
private void OnCompleted() | |
{ | |
while (taskCount > 0) | |
{ | |
currentTasks = currentTasks.Where(t => t != null).ToArray(); | |
nextIndex = Task.WaitAny(currentTasks); | |
CallSubscriber(currentTasks[nextIndex]); | |
currentTasks[nextIndex] = null; | |
taskCount--; | |
} | |
this.Dispose(); | |
this.observer.OnCompleted(); | |
} | |
public void Dispose() | |
{ | |
if (taskCount > 0) | |
{ | |
//cancel running tasks? | |
} | |
if (this.taskObservableSubscriber != null) | |
{ | |
this.taskObservableSubscriber.Dispose(); | |
this.taskObservableSubscriber = null; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment