Skip to content

Instantly share code, notes, and snippets.

@noseratio
Created May 30, 2022 11:08
Show Gist options
  • Save noseratio/fe41f544d4a4a42db69e2238bec4c86c to your computer and use it in GitHub Desktop.
Save noseratio/fe41f544d4a4a42db69e2238bec4c86c to your computer and use it in GitHub Desktop.
ReactiveX: do something when the first item in the sequence is emitted or the sequence is terminated
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
// do something when the first item in the squence is emitted
// or the sequence is terminated
var sequence = Observable
.Interval(TimeSpan.FromMilliseconds(1000))
.Do(n => Console.WriteLine(n))
.Skip(3);
static void Done() => Console.WriteLine("Done");
Console.WriteLine("RunAsync");
{
using var cts = new CancellationTokenSource(2500);
using var _ = sequence
.FirstAsync()
.Finally(Done)
.RunAsync(cts.Token);
Console.ReadLine();
}
Console.WriteLine("ToTask");
{
using var cts = new CancellationTokenSource(2500);
_ = sequence
.FirstAsync()
.Finally(Done)
.ToTask(cts.Token);
Console.ReadLine();
}
Console.WriteLine("TakeUntil, Publish, Connect");
{
using var cts = new CancellationTokenSource(2500);
using var _ = sequence
.FirstAsync()
.TakeUntil(
// or: Task.Delay(Timeout.Infinite, cts.Token).ToObservable()
cts.Token.ToObservable<long>())
.Finally(Done)
.Publish()
.Connect();
Console.ReadLine();
}
Console.WriteLine("Subscribe via Action<Notification>");
{
using var cts = new CancellationTokenSource(2500);
sequence
.FirstAsync()
.Finally(Done)
.Subscribe(
Observer.ToObserver<long>(n => Console.WriteLine(n)),
cts.Token);
Console.ReadLine();
}
Console.WriteLine("Subscribe via onNext/onError/onCompleted");
{
using var cts = new CancellationTokenSource(2500);
sequence
.FirstAsync()
.Finally(Done)
.Subscribe(
onNext: n => Console.WriteLine(n),
onError: e => Console.WriteLine(e),
onCompleted: () => Console.WriteLine("completed"),
cts.Token);
Console.ReadLine();
}
public static class RxExt
{
public static IObservable<T> ToObservable<T>(this CancellationToken @this)
{
if (@this.IsCancellationRequested)
{
return Observable.Throw<T>(new OperationCanceledException(@this));
}
if (!@this.CanBeCanceled)
{
return Observable.Never<T>();
}
return Observable.Create<T>(
observer => @this.Register(
(_, token) => observer.OnError(new OperationCanceledException(token)),
null));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment