Skip to content

Instantly share code, notes, and snippets.

@andybrackley
Created July 4, 2014 10:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andybrackley/13145bc6825c3c45bdf1 to your computer and use it in GitHub Desktop.
Save andybrackley/13145bc6825c3c45bdf1 to your computer and use it in GitHub Desktop.
An RX IObservable Extension method to Merge two streams together based on a Key
public static class ObsExt
{
// Taken from this example here:
// http://rxwiki.wikidot.com/101samples#toc39
public static IObservable<TResult> JoinMerge<TOuter, TInner, TKey, TResult>
(this IObservable<TOuter> outer_,
IObservable<TInner> inner_,
Func<TOuter, TKey> outerKeySelector_,
Func<TInner, TKey> innerKeySelector_,
Func<TOuter, TInner, TResult> resultSelector_)
{
var joinedStream =
outer_.GroupJoin(inner_,
_ => Observable.Never<TOuter>(),
_ => Observable.Never<TInner>(),
(ou_, in_) => new { Outer = ou_, Inner = in_ });
return Observable.Create<TResult>(obs_ =>
{
IDisposable innerSubscription =
joinedStream.Subscribe(inOut_ => inOut_.Inner
.Where(x_ => innerKeySelector_(x_).Equals(outerKeySelector_(inOut_.Outer)))
.Subscribe(merged_ =>
obs_.OnNext(resultSelector_(inOut_.Outer, merged_))));
return innerSubscription.Dispose;
});
}
}
@andybrackley
Copy link
Author

Looks like a bug with the Dispose.
It should dispose both the joinedStream and the innerSubscription.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment