Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Pausable IObservable stream
public static class ObservableExtensions
{
/// <summary>
/// Creates a stream that can be paused and resumed. Whilst paused any updates to the source stream will be saved in a replay stream.
/// Once the stream is resumed, the items saved in the replay stream will be published to the ouput stream.
/// The stream can be paused and resumed by publishing a bool to the isPausedStream - true to pause, false to resume.
/// By default the stream starts as paused, but this can be overridden by setting the isPausedOnStart parameter.
/// </summary>
/// <typeparam name="T">The object that provides notification information.This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.</typeparam>
/// <param name="sourceStream">The stream to read from and either republish or save whilst the stream is paused.</param>
/// <param name="isPausedStream">The stream to publish bools to to pause and resume the stream.</param>
/// <param name="isPausedOnStart">The default state of the stream - true for paused, false for resumed. Default is true (paused)</param>
/// <returns>A stream that items are published to when not paused.</returns>
public static IObservable<T> Pausable<T>(this IObservable<T> sourceStream, IObservable<bool> isPausedStream, bool isPausedOnStart = true)
{
return Observable.Create<T>(o =>
{
var subscriptions = new SerialDisposable();
var replaySubjects = new SerialDisposable();
var subscription = sourceStream.Publish(stream =>
{
Func<ReplaySubject<T>> replaySubjectFactory = () =>
{
var rs = new ReplaySubject<T>();
replaySubjects.Disposable = rs;
subscriptions.Disposable = stream.Subscribe(rs);
return rs;
};
var replaySubject = replaySubjectFactory();
Func<bool, IObservable<T>> switcher = isPaused =>
{
if (isPaused)
{
replaySubject = replaySubjectFactory();
return Observable.Empty<T>();
}
return replaySubject.Concat(stream);
};
return isPausedStream
.StartWith(isPausedOnStart)
.DistinctUntilChanged()
.Select(switcher)
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, subscriptions);
});
}
}
@jimbobbennett

This comment has been minimized.

Show comment
Hide comment
@jimbobbennett

jimbobbennett Jun 24, 2015

Code to create a stream that by default is paused - i.e. it saves items sent to the stream in a ReplaySubject. Once the stream is resumed, the saved items are replayed. The stream can be paused/resumed by sending bools to the isPausedStream

Owner

jimbobbennett commented Jun 24, 2015

Code to create a stream that by default is paused - i.e. it saves items sent to the stream in a ReplaySubject. Once the stream is resumed, the saved items are replayed. The stream can be paused/resumed by sending bools to the isPausedStream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment