Skip to content

Instantly share code, notes, and snippets.

@jchapuis
Created May 12, 2014 13:32
Show Gist options
  • Save jchapuis/56c7c50fc8cd1b39e091 to your computer and use it in GitHub Desktop.
Save jchapuis/56c7c50fc8cd1b39e091 to your computer and use it in GitHub Desktop.
ValveSubject: supports turning on/off the output stream. Incoming events are buffered while the valve is closed, and released in order when opening it. The valve supports opening/closing any number of times.
public class ValveSubject<T> : ISubject<T>
{
private enum Valve
{
Open,
Closed
}
private readonly Subject<T> input = new Subject<T>();
private readonly Subject<Valve> valveSubject = new Subject<Valve>();
private readonly Subject<T> output = new Subject<T>();
public ValveSubject()
{
var valveOperations = valveSubject.DistinctUntilChanged();
var valveClosings = valveOperations.Where(v => v == Valve.Closed);
var valveOpenings = valveOperations.Where(v => v == Valve.Open);
var bufferedInputChunksWhenValveIsClosed = input.Buffer(valveClosings, _ => valveOpenings);
var inputsWhenValveIsOpen = input.CombineLatest(valveOperations, (t, valve) => new { Valve = valve, Value = t }).DistinctUntilChanged(c => c.Value).Where(c => c.Valve == Valve.Open).Select(c => c.Value);
inputsWhenValveIsOpen.Merge(bufferedInputChunksWhenValveIsClosed.SelectMany(t => t)).Subscribe(output);
valveSubject.OnNext(Valve.Open);
}
public void Open()
{
valveSubject.OnNext(Valve.Open);
}
public void Close()
{
valveSubject.OnNext(Valve.Closed);
}
public void OnNext(T value)
{
input.OnNext(value);
}
public void OnError(Exception error)
{
input.OnError(error);
}
public void OnCompleted()
{
input.OnCompleted();
valveSubject.OnCompleted();
output.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return output.Subscribe(observer);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment