Created
April 7, 2014 23:31
-
-
Save ZachBray/10073398 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive; | |
using System.Reactive.Linq; | |
using System.Reactive.Disposables; | |
using System.Reactive.Subjects; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Cache | |
{ | |
public class SerialActionScheduler | |
{ | |
private readonly ConcurrentQueue<Action> _actions = new ConcurrentQueue<Action>(); | |
private int _actionCount; | |
public void Schedule(Action action) | |
{ | |
_actions.Enqueue(action); | |
var newActionCount = Interlocked.Increment(ref _actionCount); | |
var isNextAction = newActionCount == 1; | |
if (isNextAction) ProcessActionsSerially(); | |
} | |
private void ProcessActionsSerially() | |
{ | |
int currentActionCount; | |
do | |
{ | |
Action nextAction; | |
_actions.TryDequeue(out nextAction); | |
nextAction(); | |
currentActionCount = Interlocked.Decrement(ref _actionCount); | |
} | |
while (currentActionCount != 0); | |
} | |
} | |
public class ReplayLatestOnNextSubject<TItem> : ISubject<TItem> | |
{ | |
private readonly HashSet<IObserver<TItem>> _observers = new HashSet<IObserver<TItem>>(); | |
private readonly SerialActionScheduler _serialScheduler = new SerialActionScheduler(); | |
private TItem _cachedItem; | |
private bool _hasCachedItem; | |
public void OnCompleted() | |
{ | |
_serialScheduler.Schedule(() => | |
{ | |
foreach(var observer in _observers) | |
{ | |
observer.OnCompleted(); | |
} | |
_observers.Clear(); | |
}); | |
} | |
public void OnError(Exception error) | |
{ | |
_serialScheduler.Schedule(() => | |
{ | |
foreach (var observer in _observers) | |
{ | |
observer.OnError(error); | |
} | |
_observers.Clear(); | |
}); | |
} | |
public void OnNext(TItem value) | |
{ | |
_serialScheduler.Schedule(() => | |
{ | |
foreach (var observer in _observers) | |
{ | |
observer.OnNext(value); | |
} | |
_cachedItem = value; | |
_hasCachedItem = true; | |
}); | |
} | |
public IDisposable Subscribe(IObserver<TItem> observer) | |
{ | |
_serialScheduler.Schedule(() => | |
{ | |
if (_hasCachedItem) | |
observer.OnNext(_cachedItem); | |
_observers.Add(observer); | |
}); | |
return Disposable.Create(() => | |
_serialScheduler.Schedule(() => { | |
_observers.Remove(observer); | |
})); | |
} | |
} | |
public class MonotonicallyGrowingCache<TKey, TItem> | |
{ | |
private readonly ConcurrentDictionary<TKey, Lazy<TItem>> _valueLookup = new ConcurrentDictionary<TKey, Lazy<TItem>>(); | |
public TItem FindOrCreate(TKey key, Func<TItem> valueFactory) | |
{ | |
_valueLookup.TryAdd(key, new Lazy<TItem>(valueFactory)); | |
return _valueLookup[key].Value; | |
} | |
} | |
public class ObservableLatestOnNextCache<TKey, TItem> | |
{ | |
private readonly CompositeDisposable _resources = new CompositeDisposable(); | |
private readonly MonotonicallyGrowingCache<TKey, IObservable<TItem>> _innerCache = | |
new MonotonicallyGrowingCache<TKey, IObservable<TItem>>(); | |
public IObservable<TItem> FindOrCreate(TKey key, Func<IObservable<TItem>> valueFactory) | |
{ | |
return _innerCache.FindOrCreate(key, () => | |
{ | |
const int connected = 0; | |
const int disconnected = 1; | |
var state = disconnected; | |
var entryResources = new SerialDisposable(); | |
_resources.Add(entryResources); | |
var values = valueFactory(); | |
var cachedValues = values | |
.Do(_ => { }, ex => Interlocked.Exchange(ref state, disconnected)) | |
.Multicast(new ReplayLatestOnNextSubject<TItem>()); | |
Action reconnect = () => | |
{ | |
entryResources.Disposable = null; | |
entryResources.Disposable = cachedValues.Connect(); | |
}; | |
var autoReconnectingValues = | |
Observable.Defer(() => | |
{ | |
var oldState = Interlocked.CompareExchange(ref state, connected, disconnected); | |
var wasDisconnected = oldState == disconnected; | |
if (wasDisconnected) reconnect(); | |
return cachedValues; | |
}); | |
return autoReconnectingValues; | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment