Created
December 2, 2013 01:19
-
-
Save alexr/7743427 to your computer and use it in GitHub Desktop.
[Rx] MultiGroupReplaySubject - similar to `ReplaySubject`, but requires observable type to be `KeyValuePair<TKey, TValue>` and maintains replay buffer per group. On subscribe it replays all the buffers for all groups. Note that there is no way to delete a group, so for the case new groups will keep appearing in the input it will eventually run o…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace System.Reactive.Subjects | |
{ | |
using System; | |
using System.Collections.Generic; | |
/// <summary> | |
/// Specialized version of ReplaySubject which process keyed events and replays last | |
/// count values from each group. Each notification is broadcasted to all subscribed | |
/// and future observers. | |
/// </summary> | |
/// <typeparam name="TKey">The type of the partitioning key.</typeparam> | |
/// <typeparam name="TValue">The type of the event value.</typeparam> | |
public sealed class MultiGroupReplaySubject<TKey, TValue> : ISubject<KeyValuePair<TKey, TValue>>, IDisposable | |
{ | |
readonly object _gate = new object(); | |
readonly Dictionary<TKey, Queue<TValue>> _queues; | |
readonly int _bufferSize; | |
bool _isDisposed; | |
bool _isStopped; | |
Exception _error; | |
List<IObserver<KeyValuePair<TKey, TValue>>> _observers; | |
/// <summary> | |
/// Creates MultiGroupReplay subject specifying buffer size per gropup. | |
/// </summary> | |
/// <param name="bufferSize">Maximal number of values per group to replay.</param> | |
public MultiGroupReplaySubject(int bufferSize) | |
{ | |
// TODO: bufferSize of 0 is kind of degenerative case, probably should disallow. | |
if (bufferSize < 0) | |
throw new ArgumentOutOfRangeException("bufferSize"); | |
_bufferSize = bufferSize; | |
_queues = new Dictionary<TKey, Queue<TValue>>(); | |
_isStopped = false; | |
_error = null; | |
_observers = new List<IObserver<KeyValuePair<TKey, TValue>>>(); | |
} | |
/// <summary> | |
/// Indicates whether the subject has observers subscribed to it. | |
/// </summary> | |
public bool HasObservers | |
{ | |
get { return _observers != null && _observers.Count > 0; } | |
} | |
/// <summary> | |
/// Notifies all subscribed and future observers about the arrival of the specified element in the sequence. | |
/// </summary> | |
/// <param name="value">The value to send to all observers.</param> | |
public void OnNext(KeyValuePair<TKey, TValue> kvp) | |
{ | |
lock (_gate) | |
{ | |
CheckDisposed(); | |
if (!_isStopped) | |
{ | |
Queue<TValue> q = null; | |
if (_queues.TryGetValue(kvp.Key, out q)) | |
{ | |
q.Enqueue(kvp.Value); | |
} | |
else | |
{ | |
q = new Queue<TValue>(); | |
_queues.Add(kvp.Key, q); | |
} | |
Trim(); | |
foreach (var observer in _observers) | |
observer.OnNext(kvp); | |
} | |
} | |
} | |
/// <summary> | |
/// Notifies all subscribed and future observers about the specified exception. | |
/// </summary> | |
/// <param name="error">The exception to send to all observers.</param> | |
/// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception> | |
public void OnError(Exception error) | |
{ | |
if (error == null) | |
throw new ArgumentNullException("error"); | |
lock (_gate) | |
{ | |
CheckDisposed(); | |
if (!_isStopped) | |
{ | |
_isStopped = true; | |
_error = error; | |
Trim(); | |
foreach (var observer in _observers) | |
observer.OnError(error); | |
_observers = new List<IObserver<KeyValuePair<TKey, TValue>>>(); | |
} | |
} | |
} | |
/// <summary> | |
/// Notifies all subscribed and future observers about the end of the sequence. | |
/// </summary> | |
public void OnCompleted() | |
{ | |
lock (_gate) | |
{ | |
CheckDisposed(); | |
if (!_isStopped) | |
{ | |
_isStopped = true; | |
Trim(); | |
foreach (var observer in _observers) | |
observer.OnCompleted(); | |
_observers = new List<IObserver<KeyValuePair<TKey, TValue>>>(); | |
} | |
} | |
} | |
/// <summary> | |
/// Subscribes an observer to the subject. | |
/// </summary> | |
/// <param name="observer">Observer to subscribe to the subject.</param> | |
/// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns> | |
/// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception> | |
public IDisposable Subscribe(IObserver<KeyValuePair<TKey, TValue>> observer) | |
{ | |
if (observer == null) | |
throw new ArgumentNullException("observer"); | |
var subscription = new RemovableDisposable(this, observer); | |
lock (_gate) | |
{ | |
CheckDisposed(); | |
Trim(); | |
_observers.Add(observer); | |
foreach (var kvp in _queues) | |
foreach (var item in kvp.Value) | |
observer.OnNext(new KeyValuePair<TKey, TValue>(kvp.Key, item)); | |
if (_error != null) | |
observer.OnError(_error); | |
else if (_isStopped) | |
observer.OnCompleted(); | |
} | |
return subscription; | |
} | |
void Unsubscribe(IObserver<KeyValuePair<TKey, TValue>> observer) | |
{ | |
lock (_gate) | |
{ | |
if (!_isDisposed) | |
_observers.Remove(observer); | |
} | |
} | |
void Trim() | |
{ | |
foreach (var q in _queues) | |
while (q.Value.Count > _bufferSize) q.Value.Dequeue(); | |
} | |
sealed class RemovableDisposable : IDisposable | |
{ | |
readonly MultiGroupReplaySubject<TKey, TValue> _subject; | |
readonly IObserver<KeyValuePair<TKey, TValue>> _observer; | |
public RemovableDisposable(MultiGroupReplaySubject<TKey, TValue> subject, IObserver<KeyValuePair<TKey, TValue>> observer) | |
{ | |
_subject = subject; | |
_observer = observer; | |
} | |
public void Dispose() | |
{ | |
_subject.Unsubscribe(_observer); | |
} | |
} | |
void CheckDisposed() | |
{ | |
if (_isDisposed) | |
throw new ObjectDisposedException(string.Empty); | |
} | |
public void Dispose() | |
{ | |
lock (_gate) | |
{ | |
_isDisposed = true; | |
_observers = null; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment