Skip to content

Instantly share code, notes, and snippets.

@cwharris
Last active January 26, 2022 18:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cwharris/7294313 to your computer and use it in GitHub Desktop.
Save cwharris/7294313 to your computer and use it in GitHub Desktop.
If you can understand this, you have a pretty good understanding of Rx, Async, Generics, and Extension Methods.
public static IObservable<TResult> BatchAsync<T, TResponse, TResult>(
this IObservable<T> source,
int count,
Func<IEnumerable<T>, CancellationToken, Task<TResponse>> process,
Func<TResponse, IEnumerable<TResult>> resultSelector
)
{
return source
.Buffer(count)
.SelectMany(batch =>
Observable
.FromAsync((token) => process(batch, token))
.SelectMany(x => resultSelector(x))
);
}
Observable.Empty<ThingToProcess>()
.BatchAsync(100, (batch, token) => apiCapableOf100PerRequest.ProcessAsync(batch, token), (response) => response.Things)
.BatchAsync(25, (batch, token) => apiCapableOf25PerRequest.ProcessAsync(batch, token), (response) => response.Things)
.Subscribe(thing => {
Console.WriteLine("Thing was processed: {0}", thing.Id);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment