Skip to content

Instantly share code, notes, and snippets.

@yallie
Last active April 25, 2017 21:12
Show Gist options
  • Save yallie/0b9e48f2999a8f421997a71559341d71 to your computer and use it in GitHub Desktop.
Save yallie/0b9e48f2999a8f421997a71559341d71 to your computer and use it in GitHub Desktop.
Process IObservable<T> asynchronously keeping the same order of events — see this discussion: http://stackoverflow.com/questions/43314307/unwrapping-iobservabletaskt-into-iobservablet
public static class ObservableExtensions
{
public static IObservable<TResult> SelectAsync<TSource, TResult>(
this IObservable<TSource> src,
Func<TSource, Task<TResult>> selectorAsync)
{
// using local variable for counter is easier than src.Scan(...)
var counter = 0;
var streamOfTasks =
from source in src
from result in Observable.FromAsync(async () => new
{
Index = Interlocked.Increment(ref counter) - 1,
Result = await selectorAsync(source)
})
select result;
// buffer the results coming out of order
return Observable.Create<TResult>(observer =>
{
var index = 0;
var buffer = new Dictionary<int, TResult>();
return streamOfTasks.Subscribe(item =>
{
buffer.Add(item.Index, item.Result);
TResult result;
while (buffer.TryGetValue(index, out result))
{
buffer.Remove(index);
observer.OnNext(result);
index++;
}
});
});
}
}
@yallie
Copy link
Author

yallie commented Apr 12, 2017

Usage:

var sourceEvents = ...; // get the IObservable<TSource>
var processedEvents = sourceEvents.SelectAsync(async x => await ProcessAsync(x)); // process each event asynchronously

Based on Enigmativity's simple approach and this SO question.

@yallie
Copy link
Author

yallie commented Apr 12, 2017

Alternative approach, using TPL DataFlow (as suggested by JSteward and VMAtm):

using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

IObservable<Message> streamOfMessages = ...; // get the source observable stream

// use TPL TransformBlock to process the stream events keeping the same order
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };
var streamOfTasks = new TransformBlock<long, long>(async msg => await ProcessAsync(msg), options);

var streamOfResults = streamOfTasks.AsObservable(); // convert back to observable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment