Skip to content

Instantly share code, notes, and snippets.

@runceel
Created March 12, 2015 07:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save runceel/cf523b44be8cf01480b2 to your computer and use it in GitHub Desktop.
Save runceel/cf523b44be8cf01480b2 to your computer and use it in GitHub Desktop.
public static IObservable<PropertyPack<TElement, ReactiveProperty<TProperty>>> ObserveElementReactiveProperty<TCollection, TElement, TProperty>(this TCollection source, Expression<Func<TElement, ReactiveProperty<TProperty>>> propertySelector)
where TCollection : INotifyCollectionChanged, IEnumerable<TElement>
where TElement : class
{
if (source == null) throw new ArgumentNullException("source");
if (propertySelector == null) throw new ArgumentNullException("propertySelector");
var memberExpression = (MemberExpression)propertySelector.Body;
var propertyInfo = memberExpression.Member as PropertyInfo;
if (propertyInfo == null)
{
throw new ArgumentException("propertySelector is not property expression");
}
var propertyName = default(string); // no use
var getter = AccessorCache<TElement>.LookupGet(propertySelector, out propertyName);
return Observable.Create<PropertyPack<TElement, ReactiveProperty<TProperty>>>(observer =>
{
//--- cache element property subscriptions
var subscriptionCache = new Dictionary<TElement, IDisposable>();
//--- subscribe / unsubscribe property which all elements have
Action<IEnumerable<TElement>> subscribe = elements =>
{
foreach (var x in elements)
{
var rp = getter(x);
var subsctiption = rp.Subscribe(_ =>
{
var pair = PropertyPack.Create(x, propertyInfo, rp);
observer.OnNext(pair);
});
subscriptionCache.Add(x, subsctiption);
}
};
Action unsubscribeAll = () =>
{
foreach (var x in subscriptionCache.Values)
x.Dispose();
subscriptionCache.Clear();
};
subscribe(source);
//--- hook collection changed
var disposable = source.CollectionChangedAsObservable().Subscribe(x =>
{
if (x.Action == NotifyCollectionChangedAction.Remove
|| x.Action == NotifyCollectionChangedAction.Replace)
{
//--- unsubscribe
var oldItems = x.OldItems.Cast<TElement>();
foreach (var y in oldItems)
{
subscriptionCache[y].Dispose();
subscriptionCache.Remove(y);
}
}
if (x.Action == NotifyCollectionChangedAction.Add
|| x.Action == NotifyCollectionChangedAction.Replace)
{
var newItems = x.NewItems.Cast<TElement>();
subscribe(newItems);
}
if (x.Action == NotifyCollectionChangedAction.Reset)
{
unsubscribeAll();
subscribe(source);
}
});
//--- unsubscribe
return Disposable.Create(() =>
{
disposable.Dispose();
unsubscribeAll();
});
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment