Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created October 22, 2013 23:41
Show Gist options
  • Save anaisbetts/7110098 to your computer and use it in GitHub Desktop.
Save anaisbetts/7110098 to your computer and use it in GitHub Desktop.
Rx-based HTTP requests that disconnect early as soon as possible when asked
/// <summary>
/// Sends an HTTP request and attempts to cancel the request as soon as
/// possible if requested to do so.
/// </summary>
/// <param name="request">The HTTP request to make</param>
/// <param name="shouldFetchContent">If given, this predicate allows you
/// to cancel the request based on the returned headers. Return false to
/// cancel reading the body</param>>
/// <returns>A tuple of the HTTP Response and the full message
/// contents.</returns>
public static IObservable<Tuple<HttpResponseMessage, byte[]>> SendAsyncObservable(this HttpClient This, HttpRequestMessage request, Func<HttpResponseMessage, bool> shouldFetchContent = null)
{
shouldFetchContent = shouldFetchContent ?? (_ => true);
var cancelSignal = new AsyncSubject<Unit>();
var ret = Observable.Create<Tuple<HttpResponseMessage, byte[]>>(async (subj, ct) => {
try {
var resp = await This.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ct);
if (!shouldFetchContent(resp)) {
cancelSignal.OnNext(Unit.Default);
cancelSignal.OnCompleted();
} else {
var data = await resp.Content.ReadAsByteArrayAsync();
subj.OnNext(Tuple.Create(resp, data));
subj.OnCompleted();
}
} catch (Exception ex) {
subj.OnError(ex);
}
});
return ret.TakeUntil(cancelSignal).PublishLast().RefCount();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment