Created
July 4, 2014 10:40
-
-
Save andybrackley/13145bc6825c3c45bdf1 to your computer and use it in GitHub Desktop.
An RX IObservable Extension method to Merge two streams together based on a Key
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class ObsExt | |
{ | |
// Taken from this example here: | |
// http://rxwiki.wikidot.com/101samples#toc39 | |
public static IObservable<TResult> JoinMerge<TOuter, TInner, TKey, TResult> | |
(this IObservable<TOuter> outer_, | |
IObservable<TInner> inner_, | |
Func<TOuter, TKey> outerKeySelector_, | |
Func<TInner, TKey> innerKeySelector_, | |
Func<TOuter, TInner, TResult> resultSelector_) | |
{ | |
var joinedStream = | |
outer_.GroupJoin(inner_, | |
_ => Observable.Never<TOuter>(), | |
_ => Observable.Never<TInner>(), | |
(ou_, in_) => new { Outer = ou_, Inner = in_ }); | |
return Observable.Create<TResult>(obs_ => | |
{ | |
IDisposable innerSubscription = | |
joinedStream.Subscribe(inOut_ => inOut_.Inner | |
.Where(x_ => innerKeySelector_(x_).Equals(outerKeySelector_(inOut_.Outer))) | |
.Subscribe(merged_ => | |
obs_.OnNext(resultSelector_(inOut_.Outer, merged_)))); | |
return innerSubscription.Dispose; | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Looks like a bug with the Dispose.
It should dispose both the joinedStream and the innerSubscription.