Skip to content

Instantly share code, notes, and snippets.

@ZachBray
Created April 7, 2014 23:31
Show Gist options
  • Save ZachBray/10073398 to your computer and use it in GitHub Desktop.
Save ZachBray/10073398 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Cache
{
public class SerialActionScheduler
{
private readonly ConcurrentQueue<Action> _actions = new ConcurrentQueue<Action>();
private int _actionCount;
public void Schedule(Action action)
{
_actions.Enqueue(action);
var newActionCount = Interlocked.Increment(ref _actionCount);
var isNextAction = newActionCount == 1;
if (isNextAction) ProcessActionsSerially();
}
private void ProcessActionsSerially()
{
int currentActionCount;
do
{
Action nextAction;
_actions.TryDequeue(out nextAction);
nextAction();
currentActionCount = Interlocked.Decrement(ref _actionCount);
}
while (currentActionCount != 0);
}
}
public class ReplayLatestOnNextSubject<TItem> : ISubject<TItem>
{
private readonly HashSet<IObserver<TItem>> _observers = new HashSet<IObserver<TItem>>();
private readonly SerialActionScheduler _serialScheduler = new SerialActionScheduler();
private TItem _cachedItem;
private bool _hasCachedItem;
public void OnCompleted()
{
_serialScheduler.Schedule(() =>
{
foreach(var observer in _observers)
{
observer.OnCompleted();
}
_observers.Clear();
});
}
public void OnError(Exception error)
{
_serialScheduler.Schedule(() =>
{
foreach (var observer in _observers)
{
observer.OnError(error);
}
_observers.Clear();
});
}
public void OnNext(TItem value)
{
_serialScheduler.Schedule(() =>
{
foreach (var observer in _observers)
{
observer.OnNext(value);
}
_cachedItem = value;
_hasCachedItem = true;
});
}
public IDisposable Subscribe(IObserver<TItem> observer)
{
_serialScheduler.Schedule(() =>
{
if (_hasCachedItem)
observer.OnNext(_cachedItem);
_observers.Add(observer);
});
return Disposable.Create(() =>
_serialScheduler.Schedule(() => {
_observers.Remove(observer);
}));
}
}
public class MonotonicallyGrowingCache<TKey, TItem>
{
private readonly ConcurrentDictionary<TKey, Lazy<TItem>> _valueLookup = new ConcurrentDictionary<TKey, Lazy<TItem>>();
public TItem FindOrCreate(TKey key, Func<TItem> valueFactory)
{
_valueLookup.TryAdd(key, new Lazy<TItem>(valueFactory));
return _valueLookup[key].Value;
}
}
public class ObservableLatestOnNextCache<TKey, TItem>
{
private readonly CompositeDisposable _resources = new CompositeDisposable();
private readonly MonotonicallyGrowingCache<TKey, IObservable<TItem>> _innerCache =
new MonotonicallyGrowingCache<TKey, IObservable<TItem>>();
public IObservable<TItem> FindOrCreate(TKey key, Func<IObservable<TItem>> valueFactory)
{
return _innerCache.FindOrCreate(key, () =>
{
const int connected = 0;
const int disconnected = 1;
var state = disconnected;
var entryResources = new SerialDisposable();
_resources.Add(entryResources);
var values = valueFactory();
var cachedValues = values
.Do(_ => { }, ex => Interlocked.Exchange(ref state, disconnected))
.Multicast(new ReplayLatestOnNextSubject<TItem>());
Action reconnect = () =>
{
entryResources.Disposable = null;
entryResources.Disposable = cachedValues.Connect();
};
var autoReconnectingValues =
Observable.Defer(() =>
{
var oldState = Interlocked.CompareExchange(ref state, connected, disconnected);
var wasDisconnected = oldState == disconnected;
if (wasDisconnected) reconnect();
return cachedValues;
});
return autoReconnectingValues;
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment