Last active
August 29, 2015 14:07
-
-
Save james-world/62dca2fe2f91531a0401 to your computer and use it in GitHub Desktop.
This is an answer to a StackOver question on Rx - see the link in the comments. It deals with detecting from a stream of item status updates when one item has been stuck in a particular state for too long.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using Microsoft.Reactive.Testing; | |
using NUnit.Framework; | |
namespace SOStuck | |
{ | |
public class StuckDetectorTests : ReactiveTest | |
{ | |
[Test] | |
public void FindSingleStuckItem() | |
{ | |
var testScheduler = new TestScheduler(); | |
var xs = testScheduler.CreateColdObservable( | |
OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("1"))); | |
var results = testScheduler.CreateObserver<MyInfo>(); | |
xs.StuckInfos(testScheduler).Subscribe(results); | |
testScheduler.Start(); | |
results.Messages.AssertEqual( | |
OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Started("1"))); | |
} | |
[Test] | |
public void NoStuckItems() | |
{ | |
var testScheduler = new TestScheduler(); | |
var xs = testScheduler.CreateColdObservable( | |
OnNext(TimeSpan.FromSeconds(0).Ticks, MyInfo.Stopped("1"))); | |
var results = testScheduler.CreateObserver<MyInfo>(); | |
xs.StuckInfos(testScheduler).Subscribe(results); | |
testScheduler.Start(); | |
results.Messages.AssertEqual(); | |
} | |
[Test] | |
public void StuckItemInTraffic() | |
{ | |
var testScheduler = new TestScheduler(); | |
var xs = testScheduler.CreateColdObservable( | |
OnNext(TimeSpan.FromMinutes(0).Ticks, MyInfo.Started("1")), | |
OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("2")), | |
OnNext(TimeSpan.FromMinutes(10).Ticks, MyInfo.Started("3")), | |
OnNext(TimeSpan.FromMinutes(15).Ticks, MyInfo.Started("4")), | |
OnNext(TimeSpan.FromMinutes(20).Ticks, MyInfo.Stopped("1")), | |
OnNext(TimeSpan.FromMinutes(25).Ticks, MyInfo.Stopped("2")), | |
OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Stopped("4"))); | |
var results = testScheduler.CreateObserver<MyInfo>(); | |
xs.StuckInfos(testScheduler).Subscribe(results); | |
testScheduler.Start(); | |
results.Messages.AssertEqual( | |
OnNext(TimeSpan.FromMinutes(40).Ticks, MyInfo.Started("3"))); | |
} | |
[Test] | |
public void StuckItemThatStoppedOkOnce() | |
{ | |
var testScheduler = new TestScheduler(); | |
var xs = testScheduler.CreateColdObservable( | |
OnNext(TimeSpan.FromMinutes(0).Ticks, MyInfo.Started("1")), | |
OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Stopped("1")), | |
OnNext(TimeSpan.FromMinutes(10).Ticks, MyInfo.Started("1"))); | |
var results = testScheduler.CreateObserver<MyInfo>(); | |
xs.StuckInfos(testScheduler).Subscribe(results); | |
testScheduler.Start(); | |
results.Messages.AssertEqual( | |
OnNext(TimeSpan.FromMinutes(40).Ticks, MyInfo.Started("1"))); | |
} | |
} | |
public static class ObservableExtensions | |
{ | |
public static IObservable<MyInfo> StuckInfos(this IObservable<MyInfo> source, | |
IScheduler scheduler = null) | |
{ | |
scheduler = scheduler ?? Scheduler.Default; | |
return source.Publish(pub => | |
pub.Where(x => x.Status == "STARTED") | |
.SelectMany( | |
x => Observable.Return(x) | |
.Delay(TimeSpan.FromMinutes(30), scheduler) | |
.TakeUntil(pub.Where(y => y.Id == x.Id | |
&& y.Status != "STARTED")))); | |
} | |
} | |
public class MyInfo | |
{ | |
public string Id { get; set; } | |
public string Status { get; set; } | |
public static MyInfo Started(string id) | |
{ | |
return new MyInfo { Id = id, Status = "STARTED" }; | |
} | |
public static MyInfo Stopped(string id) | |
{ | |
return new MyInfo { Id = id, Status = "STOPPED" }; | |
} | |
public static MyInfo Phase(string id, int phase) | |
{ | |
return new MyInfo { Id = id, Status = "PHASE" + phase }; | |
} | |
protected bool Equals(MyInfo other) | |
{ | |
return string.Equals(Id, other.Id) && string.Equals(Status, other.Status); | |
} | |
public override bool Equals(object obj) | |
{ | |
if (ReferenceEquals(null, obj)) return false; | |
if (ReferenceEquals(this, obj)) return true; | |
if (obj.GetType() != this.GetType()) return false; | |
return Equals((MyInfo)obj); | |
} | |
public override int GetHashCode() | |
{ | |
unchecked | |
{ | |
return (Id.GetHashCode() * 397) ^ Status.GetHashCode(); | |
} | |
} | |
public static bool operator ==(MyInfo left, MyInfo right) | |
{ | |
return Equals(left, right); | |
} | |
public static bool operator !=(MyInfo left, MyInfo right) | |
{ | |
return !Equals(left, right); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
An answer for this StackOverflow question.