-
-
Save scho/7789e73c873d44dd6895113895cfc606 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using UniRx; | |
public class CollectionObservable<T, TValue> : IObservable<TValue> where T : class | |
{ | |
private readonly IReadOnlyReactiveCollection<T> _collection; | |
private readonly Func<T, IObservable<TValue>> _observableFunc; | |
private readonly Subject<TValue> _subject; | |
private readonly Disposables _itemDisposables; | |
private readonly CompositeDisposable _collectionDisposables; | |
public CollectionObservable(IReadOnlyReactiveCollection<T> collection, | |
Func<T, IObservable<TValue>> observableFunc) | |
{ | |
_collection = collection; | |
_observableFunc = observableFunc; | |
_collectionDisposables = new CompositeDisposable(); | |
_itemDisposables = new Disposables(); | |
_subject = new Subject<TValue>(); | |
} | |
public IDisposable Subscribe(IObserver<TValue> observer) | |
{ | |
try | |
{ | |
return new DisposableSubject<TValue>(_subject, | |
new CompositeDisposable {_collectionDisposables, _itemDisposables}).Subscribe(observer); | |
} | |
finally | |
{ | |
SubscribeCollectionItems(); | |
SubscribeToCollectionChanges(); | |
} | |
} | |
private void SubscribeCollectionItems() | |
{ | |
_collection.Select(_observableFunc).ForEach(observable => | |
_itemDisposables.Add(observable.Subscribe(value => _subject.OnNext(value)))); | |
} | |
private void SubscribeToCollectionChanges() | |
{ | |
_collection.ObserveAdd().Subscribe(add => | |
_itemDisposables.Insert(add.Index, | |
_observableFunc(add.Value).Subscribe(value => _subject.OnNext(value)))); | |
_collection.ObserveRemove().Subscribe(remove => _itemDisposables.DisposeAt(remove.Index)) | |
.AddTo(_collectionDisposables); | |
_collection.ObserveReplace().Subscribe(replace => | |
{ | |
_itemDisposables.DisposeAt(replace.Index); | |
_itemDisposables.Insert(replace.Index, | |
_observableFunc(replace.NewValue).Subscribe(value => _subject.OnNext(value))); | |
}).AddTo(_collectionDisposables); | |
_collection.ObserveReset().Subscribe(reset => | |
{ | |
_itemDisposables.Dispose(); | |
SubscribeCollectionItems(); | |
}).AddTo(_collectionDisposables); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using UniRx; | |
public class DisposableSubject<TValue> : IObservable<TValue> | |
{ | |
private readonly ISubject<TValue> _subject; | |
private readonly IDisposable _outerDisposable; | |
public DisposableSubject(ISubject<TValue> subject, IDisposable outerDisposable) | |
{ | |
_subject = subject; | |
_outerDisposable = outerDisposable; | |
} | |
public IDisposable Subscribe(IObserver<TValue> observer) | |
{ | |
var disposables = new CompositeDisposable {_outerDisposable}; | |
_subject.Subscribe(observer).AddTo(disposables); | |
return disposables; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment