Skip to content

Instantly share code, notes, and snippets.

@markusjohnsson
Created January 30, 2014 09:49
Show Gist options
  • Save markusjohnsson/8705473 to your computer and use it in GitHub Desktop.
Save markusjohnsson/8705473 to your computer and use it in GitHub Desktop.
IObservable.FirstOrDefaultObservable() - a non-blocking FirstOrDefault for IObservable
public static class ObservableExtensions
{
public static IObservable<T> FirstOrDefaultObservable<T>(this IObservable<T> source)
{
return Observable
.Create<T>(
observer =>
{
bool hasFired = false;
IDisposable disposable = null;
disposable = source.Subscribe(
next =>
{
if (hasFired == false)
{
hasFired = true;
observer.OnNext(next);
observer.OnCompleted();
disposable.Dispose();
}
},
error => observer.OnError(error),
() =>
{
if (false == hasFired)
{
observer.OnNext(default(T));
observer.OnCompleted();
}
});
return () => disposable.Dispose();
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment