Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Pause an RX stream
public static IObservable<T> Pausable<T>(
this IObservable<T> sourceStream,
IObservable<bool> isPausedStream,
bool startPaused = false)
{
return Observable.Create<T>(o =>
{
var subscriptions = new SerialDisposable();
var replaySubjects = new SerialDisposable();
var subscription = sourceStream.Publish(stream =>
{
Func<ReplaySubject<T>> replaySubjectFactory = () =>
{
var rs = new ReplaySubject<T>();
replaySubjects.Disposable = rs;
subscriptions.Disposable = stream.Subscribe(rs);
return rs;
};
var replaySubject = replaySubjectFactory();
Func<bool, IObservable<T>> switcher = isPaused =>
{
if (isPaused)
{
replaySubject = replaySubjectFactory();
return Observable.Empty<T>();
}
else
{
return replaySubject.Concat(stream);
}
};
return isPausedStream
.StartWith(startPaused)
.DistinctUntilChanged()
.Select(switcher)
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, subscriptions);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment