Skip to content

Instantly share code, notes, and snippets.

@james-world
Last active August 29, 2015 14:07
Show Gist options
  • Save james-world/62dca2fe2f91531a0401 to your computer and use it in GitHub Desktop.
Save james-world/62dca2fe2f91531a0401 to your computer and use it in GitHub Desktop.
This is an answer to a StackOver question on Rx - see the link in the comments. It deals with detecting from a stream of item status updates when one item has been stuck in a particular state for too long.
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using NUnit.Framework;
namespace SOStuck
{
public class StuckDetectorTests : ReactiveTest
{
[Test]
public void FindSingleStuckItem()
{
var testScheduler = new TestScheduler();
var xs = testScheduler.CreateColdObservable(
OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("1")));
var results = testScheduler.CreateObserver<MyInfo>();
xs.StuckInfos(testScheduler).Subscribe(results);
testScheduler.Start();
results.Messages.AssertEqual(
OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Started("1")));
}
[Test]
public void NoStuckItems()
{
var testScheduler = new TestScheduler();
var xs = testScheduler.CreateColdObservable(
OnNext(TimeSpan.FromSeconds(0).Ticks, MyInfo.Stopped("1")));
var results = testScheduler.CreateObserver<MyInfo>();
xs.StuckInfos(testScheduler).Subscribe(results);
testScheduler.Start();
results.Messages.AssertEqual();
}
[Test]
public void StuckItemInTraffic()
{
var testScheduler = new TestScheduler();
var xs = testScheduler.CreateColdObservable(
OnNext(TimeSpan.FromMinutes(0).Ticks, MyInfo.Started("1")),
OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("2")),
OnNext(TimeSpan.FromMinutes(10).Ticks, MyInfo.Started("3")),
OnNext(TimeSpan.FromMinutes(15).Ticks, MyInfo.Started("4")),
OnNext(TimeSpan.FromMinutes(20).Ticks, MyInfo.Stopped("1")),
OnNext(TimeSpan.FromMinutes(25).Ticks, MyInfo.Stopped("2")),
OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Stopped("4")));
var results = testScheduler.CreateObserver<MyInfo>();
xs.StuckInfos(testScheduler).Subscribe(results);
testScheduler.Start();
results.Messages.AssertEqual(
OnNext(TimeSpan.FromMinutes(40).Ticks, MyInfo.Started("3")));
}
[Test]
public void StuckItemThatStoppedOkOnce()
{
var testScheduler = new TestScheduler();
var xs = testScheduler.CreateColdObservable(
OnNext(TimeSpan.FromMinutes(0).Ticks, MyInfo.Started("1")),
OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Stopped("1")),
OnNext(TimeSpan.FromMinutes(10).Ticks, MyInfo.Started("1")));
var results = testScheduler.CreateObserver<MyInfo>();
xs.StuckInfos(testScheduler).Subscribe(results);
testScheduler.Start();
results.Messages.AssertEqual(
OnNext(TimeSpan.FromMinutes(40).Ticks, MyInfo.Started("1")));
}
}
public static class ObservableExtensions
{
public static IObservable<MyInfo> StuckInfos(this IObservable<MyInfo> source,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return source.Publish(pub =>
pub.Where(x => x.Status == "STARTED")
.SelectMany(
x => Observable.Return(x)
.Delay(TimeSpan.FromMinutes(30), scheduler)
.TakeUntil(pub.Where(y => y.Id == x.Id
&& y.Status != "STARTED"))));
}
}
public class MyInfo
{
public string Id { get; set; }
public string Status { get; set; }
public static MyInfo Started(string id)
{
return new MyInfo { Id = id, Status = "STARTED" };
}
public static MyInfo Stopped(string id)
{
return new MyInfo { Id = id, Status = "STOPPED" };
}
public static MyInfo Phase(string id, int phase)
{
return new MyInfo { Id = id, Status = "PHASE" + phase };
}
protected bool Equals(MyInfo other)
{
return string.Equals(Id, other.Id) && string.Equals(Status, other.Status);
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((MyInfo)obj);
}
public override int GetHashCode()
{
unchecked
{
return (Id.GetHashCode() * 397) ^ Status.GetHashCode();
}
}
public static bool operator ==(MyInfo left, MyInfo right)
{
return Equals(left, right);
}
public static bool operator !=(MyInfo left, MyInfo right)
{
return !Equals(left, right);
}
}
}
@james-world
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment