Skip to content

Instantly share code, notes, and snippets.

@alexr
Created December 2, 2013 01:19
Show Gist options
  • Save alexr/7743427 to your computer and use it in GitHub Desktop.
Save alexr/7743427 to your computer and use it in GitHub Desktop.
[Rx] MultiGroupReplaySubject - similar to `ReplaySubject`, but requires observable type to be `KeyValuePair<TKey, TValue>` and maintains replay buffer per group. On subscribe it replays all the buffers for all groups. Note that there is no way to delete a group, so for the case new groups will keep appearing in the input it will eventually run o…
namespace System.Reactive.Subjects
{
using System;
using System.Collections.Generic;
/// <summary>
/// Specialized version of ReplaySubject which process keyed events and replays last
/// count values from each group. Each notification is broadcasted to all subscribed
/// and future observers.
/// </summary>
/// <typeparam name="TKey">The type of the partitioning key.</typeparam>
/// <typeparam name="TValue">The type of the event value.</typeparam>
public sealed class MultiGroupReplaySubject<TKey, TValue> : ISubject<KeyValuePair<TKey, TValue>>, IDisposable
{
readonly object _gate = new object();
readonly Dictionary<TKey, Queue<TValue>> _queues;
readonly int _bufferSize;
bool _isDisposed;
bool _isStopped;
Exception _error;
List<IObserver<KeyValuePair<TKey, TValue>>> _observers;
/// <summary>
/// Creates MultiGroupReplay subject specifying buffer size per gropup.
/// </summary>
/// <param name="bufferSize">Maximal number of values per group to replay.</param>
public MultiGroupReplaySubject(int bufferSize)
{
// TODO: bufferSize of 0 is kind of degenerative case, probably should disallow.
if (bufferSize < 0)
throw new ArgumentOutOfRangeException("bufferSize");
_bufferSize = bufferSize;
_queues = new Dictionary<TKey, Queue<TValue>>();
_isStopped = false;
_error = null;
_observers = new List<IObserver<KeyValuePair<TKey, TValue>>>();
}
/// <summary>
/// Indicates whether the subject has observers subscribed to it.
/// </summary>
public bool HasObservers
{
get { return _observers != null && _observers.Count > 0; }
}
/// <summary>
/// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
/// </summary>
/// <param name="value">The value to send to all observers.</param>
public void OnNext(KeyValuePair<TKey, TValue> kvp)
{
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
Queue<TValue> q = null;
if (_queues.TryGetValue(kvp.Key, out q))
{
q.Enqueue(kvp.Value);
}
else
{
q = new Queue<TValue>();
_queues.Add(kvp.Key, q);
}
Trim();
foreach (var observer in _observers)
observer.OnNext(kvp);
}
}
}
/// <summary>
/// Notifies all subscribed and future observers about the specified exception.
/// </summary>
/// <param name="error">The exception to send to all observers.</param>
/// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
_isStopped = true;
_error = error;
Trim();
foreach (var observer in _observers)
observer.OnError(error);
_observers = new List<IObserver<KeyValuePair<TKey, TValue>>>();
}
}
}
/// <summary>
/// Notifies all subscribed and future observers about the end of the sequence.
/// </summary>
public void OnCompleted()
{
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
_isStopped = true;
Trim();
foreach (var observer in _observers)
observer.OnCompleted();
_observers = new List<IObserver<KeyValuePair<TKey, TValue>>>();
}
}
}
/// <summary>
/// Subscribes an observer to the subject.
/// </summary>
/// <param name="observer">Observer to subscribe to the subject.</param>
/// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
/// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
public IDisposable Subscribe(IObserver<KeyValuePair<TKey, TValue>> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
var subscription = new RemovableDisposable(this, observer);
lock (_gate)
{
CheckDisposed();
Trim();
_observers.Add(observer);
foreach (var kvp in _queues)
foreach (var item in kvp.Value)
observer.OnNext(new KeyValuePair<TKey, TValue>(kvp.Key, item));
if (_error != null)
observer.OnError(_error);
else if (_isStopped)
observer.OnCompleted();
}
return subscription;
}
void Unsubscribe(IObserver<KeyValuePair<TKey, TValue>> observer)
{
lock (_gate)
{
if (!_isDisposed)
_observers.Remove(observer);
}
}
void Trim()
{
foreach (var q in _queues)
while (q.Value.Count > _bufferSize) q.Value.Dequeue();
}
sealed class RemovableDisposable : IDisposable
{
readonly MultiGroupReplaySubject<TKey, TValue> _subject;
readonly IObserver<KeyValuePair<TKey, TValue>> _observer;
public RemovableDisposable(MultiGroupReplaySubject<TKey, TValue> subject, IObserver<KeyValuePair<TKey, TValue>> observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
_subject.Unsubscribe(_observer);
}
}
void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(string.Empty);
}
public void Dispose()
{
lock (_gate)
{
_isDisposed = true;
_observers = null;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment