Skip to content

Instantly share code, notes, and snippets.

@james-world
Last active December 1, 2019 16:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save james-world/c46f09f32e2d4f338b07 to your computer and use it in GitHub Desktop.
Save james-world/c46f09f32e2d4f338b07 to your computer and use it in GitHub Desktop.
RollingReplaySubject - Works like ReplaySubject but has buffer cleared on a TimeInterval or based on an input stream.
namespace RxExtensions
{
using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public class RollingReplaySubject
{
public static RollingReplaySubject<TSource, long> Create<TSource>(
TimeSpan bufferClearingInterval)
{
return
new RollingReplaySubject<TSource, long>(
Observable.Interval(bufferClearingInterval));
}
public static RollingReplaySubject<TSource, long> Create<TSource>(
TimeSpan bufferClearingInterval, IScheduler scheduler)
{
return
new RollingReplaySubject<TSource, long>(
Observable.Interval(bufferClearingInterval, scheduler));
}
protected class NopSubject<TSource> : ISubject<TSource>
{
public static readonly NopSubject<TSource> Default = new NopSubject<TSource>();
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(TSource value)
{
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
return Disposable.Empty;
}
}
}
public class RollingReplaySubject<TSource, TBufferClearing> : RollingReplaySubject, ISubject<TSource>
{
private readonly ReplaySubject<IObservable<TSource>> _subjects;
private readonly IObservable<TSource> _concatenatedSubjects;
private ISubject<TSource> _currentSubject;
private readonly IDisposable _bufferClearingHandle;
private readonly object _gate = new object();
public RollingReplaySubject(IObservable<TBufferClearing> bufferClearing)
{
_bufferClearingHandle = bufferClearing.Synchronize(_gate).Subscribe(_ => Clear());
_subjects = new ReplaySubject<IObservable<TSource>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<TSource>();
_subjects.OnNext(_currentSubject);
}
private void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<TSource>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(TSource value)
{
lock (_gate)
{
_currentSubject.OnNext(value);
}
}
public void OnError(Exception error)
{
lock (_gate)
{
_currentSubject.OnError(error);
_currentSubject = NopSubject<TSource>.Default;
_bufferClearingHandle.Dispose();
}
}
public void OnCompleted()
{
lock (_gate)
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
_currentSubject = NopSubject<TSource>.Default;
_bufferClearingHandle.Dispose();
}
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
}
namespace RxExtensions.Tests
{
using Microsoft.Reactive.Testing;
using NUnit.Framework;
using System;
using System.Reactive;
using RxExtensions;
public class RollingReplaySubjectTests : ReactiveTest
{
[Test]
public void ReplaysEvents()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
var sut = RollingReplaySubject.Create<int>(TimeSpan.FromDays(1), scheduler);
source.Subscribe(sut.OnNext, sut.OnError, sut.OnCompleted);
var results = scheduler.CreateObserver<int>();
scheduler.AdvanceBy(1000);
sut.Subscribe(results);
results.Messages.AssertEqual(
OnNext(1000,1),
OnNext(1000,2),
OnNext(1000,3));
}
[Test]
public void PlaysLiveEvents()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
var sut = RollingReplaySubject.Create<int>(TimeSpan.FromDays(1), scheduler);
source.Subscribe(sut.OnNext, sut.OnError, sut.OnCompleted);
var results = scheduler.CreateObserver<int>();
sut.Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
}
[Test]
public void ReplaysEventsThenPlaysLiveEvents()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 6));
var sut = RollingReplaySubject.Create<int>(TimeSpan.FromDays(1), scheduler);
source.Subscribe(sut.OnNext, sut.OnError, sut.OnCompleted);
var results = scheduler.CreateObserver<int>();
scheduler.AdvanceBy(300);
sut.Subscribe(results);
scheduler.AdvanceBy(1000);
results.Messages.AssertEqual(
OnNext(300, 1),
OnNext(300, 2),
OnNext(300, 3),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 6));
}
[Test]
public void ClearsBufferEveryTimeSpan()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 6));
var sut = RollingReplaySubject.Create<int>(TimeSpan.FromTicks(200), scheduler);
source.Subscribe(sut.OnNext, sut.OnError, sut.OnCompleted);
var results1 = scheduler.CreateObserver<int>();
var results2 = scheduler.CreateObserver<int>();
var results3 = scheduler.CreateObserver<int>();
sut.Subscribe(results1);
scheduler.AdvanceTo(300);
sut.Subscribe(results2);
scheduler.AdvanceTo(400);
sut.Subscribe(results3);
scheduler.AdvanceTo(1000);
results1.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 6));
results2.Messages.AssertEqual(
OnNext(300, 2),
OnNext(300, 3),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 6));
results3.Messages.AssertEqual(
OnNext(500, 5),
OnNext(600, 6));
}
[Test]
public void ClearsBufferEveryClearingSignal()
{
var scheduler = new TestScheduler();
var clearing = scheduler.CreateColdObservable(
OnNext(350, Unit.Default),
OnNext(500, Unit.Default));
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 6));
var sut = new RollingReplaySubject<int, Unit>(clearing);
source.Subscribe(sut.OnNext, sut.OnError, sut.OnCompleted);
var results1 = scheduler.CreateObserver<int>();
var results2 = scheduler.CreateObserver<int>();
var results3 = scheduler.CreateObserver<int>();
var results4 = scheduler.CreateObserver<int>();
sut.Subscribe(results1);
scheduler.AdvanceTo(350);
sut.Subscribe(results2);
scheduler.AdvanceTo(400);
sut.Subscribe(results3);
scheduler.AdvanceTo(500);
sut.Subscribe(results4);
scheduler.AdvanceTo(1000);
results1.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 6));
results2.Messages.AssertEqual(
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 6));
results3.Messages.AssertEqual(
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 6));
results4.Messages.AssertEqual(
OnNext(500, 5),
OnNext(600, 6));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment