Skip to content

Instantly share code, notes, and snippets.

@runceel
Created September 18, 2014 12:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save runceel/ffccfb60c6c451fa4417 to your computer and use it in GitHub Desktop.
Save runceel/ffccfb60c6c451fa4417 to your computer and use it in GitHub Desktop.
using Codeplex.Reactive.Extensions;
using Codeplex.Reactive.Notifiers;
using System;
using System.Diagnostics;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace ConsoleApplication6
{
class Program
{
static void Main(string[] args)
{
var dataStream = new Subject<double>();
var m = new MitsubaObservable<double>(dataStream);
m.Errors.Subscribe(_ => Console.WriteLine("Error"));
m.Timeouts.Subscribe(_ => Console.WriteLine("Timeout"));
while(true)
{
Console.ReadKey();
dataStream.OnNext(1.0);
}
}
}
class MitsubaObservable<T> : IDisposable
{
private readonly object Sync = new object();
private const int TimeoutSecond = 10;
private const int ErrorLimit = 30;
private CompositeDisposable disposable = new CompositeDisposable();
private CountNotifier timeCount = new CountNotifier(TimeoutSecond);
private CountNotifier errorCount = new CountNotifier(ErrorLimit);
private CompositeDisposable timeCountDisposable = new CompositeDisposable();
public MitsubaObservable(IObservable<T> source)
{
Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(_ =>
{
this.timeCount.Increment().AddTo(this.timeCountDisposable);
if (this.timeCount.Max == this.timeCount.Count)
{
this.errorCount.Increment();
}
})
.AddTo(this.disposable);
source.Subscribe(_ =>
{
this.timeCountDisposable.Dispose();
this.timeCountDisposable = new CompositeDisposable();
})
.AddTo(this.disposable);
}
public IObservable<Unit> Timeouts
{
get { return this.timeCount.Where(s => s == CountChangedStatus.Max).ToUnit(); }
}
public IObservable<Unit> Errors
{
get { return this.errorCount.Where(s => s == CountChangedStatus.Max).ToUnit(); }
}
public void Dispose()
{
this.disposable.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment