Skip to content

Instantly share code, notes, and snippets.

@SuperJMN
Created January 20, 2016 19:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SuperJMN/8e5cdac40117a4bcec4c to your computer and use it in GitHub Desktop.
Save SuperJMN/8e5cdac40117a4bcec4c to your computer and use it in GitHub Desktop.
StackOverflow
void Main()
{
var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var bs = new Subject<bool>();
var pxs = xs.Pausable(bs);
pxs.Subscribe(x => { Debug.WriteLine(x); });
Thread.Sleep(10000);
bs.OnNext(true);
Thread.Sleep(10000);
}
public static class Extensions
{
public static IObservable<T> Pausable<T>(
this IObservable<T> source,
IObservable<bool> pauser)
{
return Observable.Create<T>(o =>
{
var paused = new SerialDisposable();
var subscription = Observable.Publish(source, ps =>
{
var values = new ReplaySubject<T>();
Func<bool, IObservable<T>> switcher = b =>
{
if (b)
{
values.Dispose();
values = new ReplaySubject<T>();
paused.Disposable = ps.Subscribe(values);
return Observable.Empty<T>();
}
else
{
return values.Concat(ps);
}
};
return pauser.StartWith(true).DistinctUntilChanged()
.Select(p => switcher(p))
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, paused);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment