Skip to content

Instantly share code, notes, and snippets.

@moswald
Created October 14, 2015 15:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moswald/568aec27b60b40b2177e to your computer and use it in GitHub Desktop.
Save moswald/568aec27b60b40b2177e to your computer and use it in GitHub Desktop.
TakeUntil wasn't working where I originally had it.
namespace TakeUntil
{
using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
static class Program
{
static void Main(string[] args)
{
var cancelSignal = new Subject<Unit>();
var running = RunAsync(cancelSignal, TaskPoolScheduler.Default);
Console.WriteLine("Press any key to stop.");
Console.ReadKey(true);
cancelSignal.OnNext(Unit.Default);
running.Wait();
}
static async Task RunAsync(IObservable<Unit> cancelSignal, IScheduler scheduler)
{
var backpressure = new BehaviorSubject<bool>(true);
var timer = Observable.Interval(TimeSpan.FromSeconds(1), scheduler)
.TakeUntil(cancelSignal)
.Zip(backpressure.Where(b => b), (a, b) => a)
.Publish();
timer.Connect();
var run = timer
//.TakeUntil(cancelSignal) // uncomment to see it work
.Do(_ => backpressure.OnNext(false))
.Select(
_ =>
{
//
// do interesting stuff here that sometimes takes more than one second to complete
//
backpressure.OnNext(true);
return Unit.Default;
}
)
.IgnoreElements();
// after cancellation, starts a timer that will eventually throw a timeout exception if allowed to run to completion
var cancelSignalGracePeriod = cancelSignal.SelectMany(_ => Observable.Never<Unit>().Timeout(TimeSpan.FromSeconds(2)));
// listen to both timer and gracePeriod
// neither return a value, so we're just listening to see who completes first
// if the worker completes any outstanding work before the gracePeriod expires, Amb will complete without error
// otherwise, it produces an error for the caller
await Observable.Amb(run.StartWith(Unit.Default), cancelSignalGracePeriod);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment