Last active
April 25, 2017 21:12
-
-
Save yallie/0b9e48f2999a8f421997a71559341d71 to your computer and use it in GitHub Desktop.
Process IObservable<T> asynchronously keeping the same order of events — see this discussion: http://stackoverflow.com/questions/43314307/unwrapping-iobservabletaskt-into-iobservablet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class ObservableExtensions | |
{ | |
public static IObservable<TResult> SelectAsync<TSource, TResult>( | |
this IObservable<TSource> src, | |
Func<TSource, Task<TResult>> selectorAsync) | |
{ | |
// using local variable for counter is easier than src.Scan(...) | |
var counter = 0; | |
var streamOfTasks = | |
from source in src | |
from result in Observable.FromAsync(async () => new | |
{ | |
Index = Interlocked.Increment(ref counter) - 1, | |
Result = await selectorAsync(source) | |
}) | |
select result; | |
// buffer the results coming out of order | |
return Observable.Create<TResult>(observer => | |
{ | |
var index = 0; | |
var buffer = new Dictionary<int, TResult>(); | |
return streamOfTasks.Subscribe(item => | |
{ | |
buffer.Add(item.Index, item.Result); | |
TResult result; | |
while (buffer.TryGetValue(index, out result)) | |
{ | |
buffer.Remove(index); | |
observer.OnNext(result); | |
index++; | |
} | |
}); | |
}); | |
} | |
} |
Alternative approach, using TPL DataFlow (as suggested by JSteward and VMAtm):
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
IObservable<Message> streamOfMessages = ...; // get the source observable stream
// use TPL TransformBlock to process the stream events keeping the same order
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };
var streamOfTasks = new TransformBlock<long, long>(async msg => await ProcessAsync(msg), options);
var streamOfResults = streamOfTasks.AsObservable(); // convert back to observable
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage:
Based on Enigmativity's simple approach and this SO question.