Skip to content

Instantly share code, notes, and snippets.

@rightfold
Last active November 6, 2015 16:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rightfold/7ed6d07f824e8503e4c4 to your computer and use it in GitHub Desktop.
Save rightfold/7ed6d07f824e8503e4c4 to your computer and use it in GitHub Desktop.
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
internal sealed class ObservableAdresses : IDisposable
{
private readonly IDisposable _connection;
private bool _disposed;
public ObservableAdresses(IPoller poller)
{
var onEvent = Observable.Create<IAddressAndValue>(o => {
EventHandler<IAddressAndValue> handler = (_, e) => o.OnNext(e);
poller.Read += handler;
return Disposable.Create(() => poller.Read -= handler);
}).Publish();
_connection = onEvent.Connect();
FastAdresses = new RateAdresses(onEvent, UpdateRate.High);
SlowAdresses = new RateAdresses(onEvent, UpdateRate.Low);
}
public RateAdresses FastAdresses { get; }
public RateAdresses SlowAdresses { get; }
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_connection.Dispose();
}
private void VerifyDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}
}
}
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Reactive.Linq;
internal sealed class RateAdresses
{
private readonly IObservable<IAddressAndValue> _onEvent;
private ImmutableDictionary<IAddress, IObservable<IAddressAndValue>> _adressObservables = ImmutableDictionary<IAddress, IObservable<IAddressAndValue>>.Empty;
private readonly object _gate = new object();
internal RateAdresses(IObservable<IAddressAndValue> onEvent, UpdateRate rate)
{
Rate = rate;
_onEvent = onEvent;
}
internal event EventHandler<IEnumerable<IAddress>> AdressesChanged;
public UpdateRate Rate { get; }
internal IObservable<IAddressAndValue<T>> Observe<T>(IAddress<T> address)
{
return Observable.Defer(() => GetOrCreateObservable(address));
}
private IObservable<IAddressAndValue<T>> GetOrCreateObservable<T>(IAddress<T> address)
{
IObservable<IAddressAndValue> observable;
if (!_adressObservables.TryGetValue(address, out observable))
{
bool added = false;
lock (_gate)
{
if (!_adressObservables.TryGetValue(address, out observable))
{
added = true;
observable = _onEvent.OfType<IAddressAndValue>()
.Where(x => Equals(x.Address, address))
.Publish()
.RefCount()
.Finally(() => Remove(address));
_adressObservables = _adressObservables.Add(address, observable);
}
}
if (added)
{
OnAdressesChanged();
}
}
return observable.OfType<IAddressAndValue<T>>()
.Distinct(x => x.Value);
}
private void Remove(IAddress address)
{
lock (_gate)
{
_adressObservables = _adressObservables.Remove(address);
}
OnAdressesChanged();
}
private void OnAdressesChanged()
{
AdressesChanged?.Invoke(this, _adressObservables.Keys);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment