Skip to content

Instantly share code, notes, and snippets.

@jimbobbennett
Last active July 29, 2023 01:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jimbobbennett/ef73188eb611b17357b9 to your computer and use it in GitHub Desktop.
Save jimbobbennett/ef73188eb611b17357b9 to your computer and use it in GitHub Desktop.
Pausable IObservable stream
public static class ObservableExtensions
{
/// <summary>
/// Creates a stream that can be paused and resumed. Whilst paused any updates to the source stream will be saved in a replay stream.
/// Once the stream is resumed, the items saved in the replay stream will be published to the ouput stream.
/// The stream can be paused and resumed by publishing a bool to the isPausedStream - true to pause, false to resume.
/// By default the stream starts as paused, but this can be overridden by setting the isPausedOnStart parameter.
/// </summary>
/// <typeparam name="T">The object that provides notification information.This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.</typeparam>
/// <param name="sourceStream">The stream to read from and either republish or save whilst the stream is paused.</param>
/// <param name="isPausedStream">The stream to publish bools to to pause and resume the stream.</param>
/// <param name="isPausedOnStart">The default state of the stream - true for paused, false for resumed. Default is true (paused)</param>
/// <returns>A stream that items are published to when not paused.</returns>
public static IObservable<T> Pausable<T>(this IObservable<T> sourceStream, IObservable<bool> isPausedStream, bool isPausedOnStart = true)
{
return Observable.Create<T>(o =>
{
var subscriptions = new SerialDisposable();
var replaySubjects = new SerialDisposable();
var subscription = sourceStream.Publish(stream =>
{
Func<ReplaySubject<T>> replaySubjectFactory = () =>
{
var rs = new ReplaySubject<T>();
replaySubjects.Disposable = rs;
subscriptions.Disposable = stream.Subscribe(rs);
return rs;
};
var replaySubject = replaySubjectFactory();
Func<bool, IObservable<T>> switcher = isPaused =>
{
if (isPaused)
{
replaySubject = replaySubjectFactory();
return Observable.Empty<T>();
}
return replaySubject.Concat(stream);
};
return isPausedStream
.StartWith(isPausedOnStart)
.DistinctUntilChanged()
.Select(switcher)
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, subscriptions);
});
}
}
@jimbobbennett
Copy link
Author

Code to create a stream that by default is paused - i.e. it saves items sent to the stream in a ReplaySubject. Once the stream is resumed, the saved items are replayed. The stream can be paused/resumed by sending bools to the isPausedStream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment