Created
September 18, 2014 12:34
-
-
Save runceel/ffccfb60c6c451fa4417 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Codeplex.Reactive.Extensions; | |
using Codeplex.Reactive.Notifiers; | |
using System; | |
using System.Diagnostics; | |
using System.Reactive; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
namespace ConsoleApplication6 | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var dataStream = new Subject<double>(); | |
var m = new MitsubaObservable<double>(dataStream); | |
m.Errors.Subscribe(_ => Console.WriteLine("Error")); | |
m.Timeouts.Subscribe(_ => Console.WriteLine("Timeout")); | |
while(true) | |
{ | |
Console.ReadKey(); | |
dataStream.OnNext(1.0); | |
} | |
} | |
} | |
class MitsubaObservable<T> : IDisposable | |
{ | |
private readonly object Sync = new object(); | |
private const int TimeoutSecond = 10; | |
private const int ErrorLimit = 30; | |
private CompositeDisposable disposable = new CompositeDisposable(); | |
private CountNotifier timeCount = new CountNotifier(TimeoutSecond); | |
private CountNotifier errorCount = new CountNotifier(ErrorLimit); | |
private CompositeDisposable timeCountDisposable = new CompositeDisposable(); | |
public MitsubaObservable(IObservable<T> source) | |
{ | |
Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(_ => | |
{ | |
this.timeCount.Increment().AddTo(this.timeCountDisposable); | |
if (this.timeCount.Max == this.timeCount.Count) | |
{ | |
this.errorCount.Increment(); | |
} | |
}) | |
.AddTo(this.disposable); | |
source.Subscribe(_ => | |
{ | |
this.timeCountDisposable.Dispose(); | |
this.timeCountDisposable = new CompositeDisposable(); | |
}) | |
.AddTo(this.disposable); | |
} | |
public IObservable<Unit> Timeouts | |
{ | |
get { return this.timeCount.Where(s => s == CountChangedStatus.Max).ToUnit(); } | |
} | |
public IObservable<Unit> Errors | |
{ | |
get { return this.errorCount.Where(s => s == CountChangedStatus.Max).ToUnit(); } | |
} | |
public void Dispose() | |
{ | |
this.disposable.Dispose(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment