-
-
Save noseratio/fe41f544d4a4a42db69e2238bec4c86c to your computer and use it in GitHub Desktop.
ReactiveX: do something when the first item in the sequence is emitted or the sequence is terminated
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Reactive; | |
using System.Reactive.Linq; | |
using System.Reactive.Threading.Tasks; | |
// do something when the first item in the squence is emitted | |
// or the sequence is terminated | |
var sequence = Observable | |
.Interval(TimeSpan.FromMilliseconds(1000)) | |
.Do(n => Console.WriteLine(n)) | |
.Skip(3); | |
static void Done() => Console.WriteLine("Done"); | |
Console.WriteLine("RunAsync"); | |
{ | |
using var cts = new CancellationTokenSource(2500); | |
using var _ = sequence | |
.FirstAsync() | |
.Finally(Done) | |
.RunAsync(cts.Token); | |
Console.ReadLine(); | |
} | |
Console.WriteLine("ToTask"); | |
{ | |
using var cts = new CancellationTokenSource(2500); | |
_ = sequence | |
.FirstAsync() | |
.Finally(Done) | |
.ToTask(cts.Token); | |
Console.ReadLine(); | |
} | |
Console.WriteLine("TakeUntil, Publish, Connect"); | |
{ | |
using var cts = new CancellationTokenSource(2500); | |
using var _ = sequence | |
.FirstAsync() | |
.TakeUntil( | |
// or: Task.Delay(Timeout.Infinite, cts.Token).ToObservable() | |
cts.Token.ToObservable<long>()) | |
.Finally(Done) | |
.Publish() | |
.Connect(); | |
Console.ReadLine(); | |
} | |
Console.WriteLine("Subscribe via Action<Notification>"); | |
{ | |
using var cts = new CancellationTokenSource(2500); | |
sequence | |
.FirstAsync() | |
.Finally(Done) | |
.Subscribe( | |
Observer.ToObserver<long>(n => Console.WriteLine(n)), | |
cts.Token); | |
Console.ReadLine(); | |
} | |
Console.WriteLine("Subscribe via onNext/onError/onCompleted"); | |
{ | |
using var cts = new CancellationTokenSource(2500); | |
sequence | |
.FirstAsync() | |
.Finally(Done) | |
.Subscribe( | |
onNext: n => Console.WriteLine(n), | |
onError: e => Console.WriteLine(e), | |
onCompleted: () => Console.WriteLine("completed"), | |
cts.Token); | |
Console.ReadLine(); | |
} | |
public static class RxExt | |
{ | |
public static IObservable<T> ToObservable<T>(this CancellationToken @this) | |
{ | |
if (@this.IsCancellationRequested) | |
{ | |
return Observable.Throw<T>(new OperationCanceledException(@this)); | |
} | |
if (!@this.CanBeCanceled) | |
{ | |
return Observable.Never<T>(); | |
} | |
return Observable.Create<T>( | |
observer => @this.Register( | |
(_, token) => observer.OnError(new OperationCanceledException(token)), | |
null)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment