Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Reactive Observable Extensions for subscribe/Dispose
using System;
using System.Reactive.Disposables;
namespace Reactive.Samples
{
// For detail see the following post:
// http://ajkalra.blogspot.com/2012/04/subscription-snooping-in-rx.html
//
public static class ObservableExtensions
{
/// <summary>
/// This method will call the action BEFORE subsctiption takes place
/// </summary>
/// <typeparam name="TSource"> Type of the observable</typeparam>
/// <param name="source">Observable being extended</param>
/// <param name="onSubscribing">Action to be called when about to subscribe</param>
/// <returns>The observable</returns>
public static IObservable<TSource> OnSubscribing<TSource>(this IObservable<TSource> source,
Action onSubscribing)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (onSubscribing == null)
{
throw new ArgumentNullException("onSubscribing");
}
return new StateObservable<TSource>(source,
onSubscribing,
() => { },
() => { },
() => { });
}
/// <summary>
/// This method will call the action AFTER subsctiption takes place
/// </summary>
/// <typeparam name="TSource">The type of the source.</typeparam>
/// <param name="source">The source.</param>
/// <param name="onSubscribed">Action to be executed after the source observable has been subscribed</param>
/// <returns>The observable</returns>
public static IObservable<TSource> OnSubscribed<TSource>(this IObservable<TSource> source,
Action onSubscribed)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (onSubscribed == null)
{
throw new ArgumentNullException("onSubscribed");
}
return new StateObservable<TSource>(source,
() => { },
onSubscribed,
() => { },
() => { });
}
/// <summary>
/// This method will call the action BEFORE the subscription is diposed
/// </summary>
/// <typeparam name="TSource">The type of the source.</typeparam>
/// <param name="source">The source.</param>
/// <param name="onDisposing">Action to be exectued before the subscription is about to be disposed</param>
/// <returns>The observable</returns>
public static IObservable<TSource> OnDisposing<TSource>(this IObservable<TSource> source,
Action onDisposing)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (onDisposing == null)
{
throw new ArgumentNullException("onDisposing");
}
return new StateObservable<TSource>(source,
() => { },
() => { },
onDisposing,
() => { });
}
/// <summary>
/// This method will call the action AFTER the subscription is diposed
/// </summary>
/// <typeparam name="TSource">The type of the source.</typeparam>
/// <param name="source">The source.</param>
/// <param name="onDisposed">Action to be exectued before the subscription is about to be disposed</param>
/// <returns>The observable</returns>
public static IObservable<TSource> OnDisposed<TSource>(this IObservable<TSource> source,
Action onDisposed)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (onDisposed == null)
{
throw new ArgumentNullException("onDisposed");
}
return new StateObservable<TSource>(source,
() => { },
() => { },
() => { },
onDisposed);
}
/// <summary>
/// This method will call various actions before/after the subscription is created/diposed
/// </summary>
/// <typeparam name="TSource">The type of the source.</typeparam>
/// <param name="source">The source.</param>
/// <param name="onSubscribing">Action to be called when about to subscribe</param>
/// <param name="onDisposed">Action to be exectued after the subscription has been disposed</param>
/// <returns>The observable</returns>
public static IObservable<TSource> OnSubscribeDispose<TSource>(this IObservable<TSource> source,
Action onSubscribing,
Action onDisposed)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (onSubscribing == null)
{
throw new ArgumentNullException("onSubscribing");
}
if (onDisposed == null)
{
throw new ArgumentNullException("onDisposed");
}
return new StateObservable<TSource>(source,
onSubscribing,
() => { },
() => { },
onDisposed);
}
/// <summary>
/// This method will call various actions before/after the subscription is created/diposed
/// </summary>
/// <typeparam name="TSource">The type of the source.</typeparam>
/// <param name="source">The source.</param>
/// <param name="onSubscribing">Action to be called when about to subscribe</param>
/// <param name="onSubscribed">Action to be exectued after the source observable has been subscribed</param>
/// <param name="onDisposing">Action to be exectued before the subscription is about to be disposed</param>
/// <param name="onDisposed">Action to be exectued after the subscription has been disposed</param>
/// <returns>The observable</returns>
public static IObservable<TSource> WhenSubscribedOrDisposed<TSource>(this IObservable<TSource> source,
Action onSubscribing,
Action onSubscribed,
Action onDisposing,
Action onDisposed)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (onSubscribing == null)
{
throw new ArgumentNullException("onSubscribing");
}
if (onSubscribed == null)
{
throw new ArgumentNullException("onSubscribed");
}
if (onDisposing== null)
{
throw new ArgumentNullException("onDisposing");
}
if (onDisposed == null)
{
throw new ArgumentNullException("onDisposed");
}
return new StateObservable<TSource>(source,
onSubscribing,
onSubscribed,
onDisposing,
onDisposed);
}
private class StateObservable<T> : IObservable<T>
{
private readonly IObservable<T> _source;
private readonly Action _beforeSub;
private readonly Action _afterSub;
private readonly Action _beforeDispose;
private readonly Action _afterDispose;
public StateObservable(
IObservable<T> source,
Action beforeSub,
Action afterSub,
Action beforeDispose,
Action afterDispose)
{
_source = source;
_beforeSub = beforeSub;
_afterSub = afterSub;
_beforeDispose = beforeDispose;
_afterDispose = afterDispose;
}
public IDisposable Subscribe(IObserver<T> observer)
{
_beforeSub();
var disposable = _source.Subscribe(observer);
_afterSub();
return Disposable.Create(
() =>
{
_beforeDispose();
disposable.Dispose();
_afterDispose();
});
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.