Skip to content

Instantly share code, notes, and snippets.

@Kralizek
Last active January 12, 2019 02:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Kralizek/f49b23e1498c039a1e1d5e1c3924fcb7 to your computer and use it in GitHub Desktop.
Save Kralizek/f49b23e1498c039a1e1d5e1c3924fcb7 to your computer and use it in GitHub Desktop.
A Rx subject that stores all values until an observer subscribes.
public class QueueSubject<T> : ISubject<T>
{
private readonly Subject<T> _subject = new Subject<T>();
private readonly Queue<Action<IObserver<T>>> _actions = new Queue<Action<IObserver<T>>>();
private bool _isCompleted = false;
private Exception _error;
public bool IsRunning => !_isCompleted && _error == null;
public bool HasObservers => _refCount > 0;
public void OnCompleted()
{
_isCompleted = true;
if (!HasObservers)
{
_actions.Enqueue(o => o.OnCompleted());
}
_subject.OnCompleted();
}
public void OnError(Exception error)
{
_error = error;
if (!HasObservers)
{
_actions.Enqueue(o => o.OnError(error));
}
_subject.OnError(error);
}
public void OnNext(T value)
{
if (IsRunning)
{
if (HasObservers)
{
_subject.OnNext(value);
}
else
{
_actions.Enqueue(o => o.OnNext(value));
}
}
}
private int _refCount = 0;
public IDisposable Subscribe(IObserver<T> observer)
{
Interlocked.Increment(ref _refCount);
return new CompositeDisposable(
Observable.Create<T>(o => ConsumeActions(o)).Concat(_subject).Subscribe(observer),
Disposable.Create(() => Interlocked.Decrement(ref _refCount))
);
}
private IDisposable ConsumeActions(IObserver<T> observable)
{
while (_actions.Count > 0)
{
var action = _actions.Dequeue();
action(observable);
}
observable.OnCompleted();
return Disposable.Empty;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment