Created
May 12, 2014 13:32
-
-
Save jchapuis/56c7c50fc8cd1b39e091 to your computer and use it in GitHub Desktop.
ValveSubject: supports turning on/off the output stream. Incoming events are buffered while the valve is closed, and released in order when opening it. The valve supports opening/closing any number of times.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ValveSubject<T> : ISubject<T> | |
{ | |
private enum Valve | |
{ | |
Open, | |
Closed | |
} | |
private readonly Subject<T> input = new Subject<T>(); | |
private readonly Subject<Valve> valveSubject = new Subject<Valve>(); | |
private readonly Subject<T> output = new Subject<T>(); | |
public ValveSubject() | |
{ | |
var valveOperations = valveSubject.DistinctUntilChanged(); | |
var valveClosings = valveOperations.Where(v => v == Valve.Closed); | |
var valveOpenings = valveOperations.Where(v => v == Valve.Open); | |
var bufferedInputChunksWhenValveIsClosed = input.Buffer(valveClosings, _ => valveOpenings); | |
var inputsWhenValveIsOpen = input.CombineLatest(valveOperations, (t, valve) => new { Valve = valve, Value = t }).DistinctUntilChanged(c => c.Value).Where(c => c.Valve == Valve.Open).Select(c => c.Value); | |
inputsWhenValveIsOpen.Merge(bufferedInputChunksWhenValveIsClosed.SelectMany(t => t)).Subscribe(output); | |
valveSubject.OnNext(Valve.Open); | |
} | |
public void Open() | |
{ | |
valveSubject.OnNext(Valve.Open); | |
} | |
public void Close() | |
{ | |
valveSubject.OnNext(Valve.Closed); | |
} | |
public void OnNext(T value) | |
{ | |
input.OnNext(value); | |
} | |
public void OnError(Exception error) | |
{ | |
input.OnError(error); | |
} | |
public void OnCompleted() | |
{ | |
input.OnCompleted(); | |
valveSubject.OnCompleted(); | |
output.OnCompleted(); | |
} | |
public IDisposable Subscribe(IObserver<T> observer) | |
{ | |
return output.Subscribe(observer); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment