Skip to content

Instantly share code, notes, and snippets.

Last active April 25, 2017 21:12
Show Gist options
  • Save yallie/0b9e48f2999a8f421997a71559341d71 to your computer and use it in GitHub Desktop.
Save yallie/0b9e48f2999a8f421997a71559341d71 to your computer and use it in GitHub Desktop.
Process IObservable<T> asynchronously keeping the same order of events — see this discussion:
public static class ObservableExtensions
public static IObservable<TResult> SelectAsync<TSource, TResult>(
this IObservable<TSource> src,
Func<TSource, Task<TResult>> selectorAsync)
// using local variable for counter is easier than src.Scan(...)
var counter = 0;
var streamOfTasks =
from source in src
from result in Observable.FromAsync(async () => new
Index = Interlocked.Increment(ref counter) - 1,
Result = await selectorAsync(source)
select result;
// buffer the results coming out of order
return Observable.Create<TResult>(observer =>
var index = 0;
var buffer = new Dictionary<int, TResult>();
return streamOfTasks.Subscribe(item =>
buffer.Add(item.Index, item.Result);
TResult result;
while (buffer.TryGetValue(index, out result))
Copy link

yallie commented Apr 12, 2017


var sourceEvents = ...; // get the IObservable<TSource>
var processedEvents = sourceEvents.SelectAsync(async x => await ProcessAsync(x)); // process each event asynchronously

Based on Enigmativity's simple approach and this SO question.

Copy link

yallie commented Apr 12, 2017

Alternative approach, using TPL DataFlow (as suggested by JSteward and VMAtm):

using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

IObservable<Message> streamOfMessages = ...; // get the source observable stream

// use TPL TransformBlock to process the stream events keeping the same order
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };
var streamOfTasks = new TransformBlock<long, long>(async msg => await ProcessAsync(msg), options);

var streamOfResults = streamOfTasks.AsObservable(); // convert back to observable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment