Skip to content

Instantly share code, notes, and snippets.

@scho
Created April 12, 2021 16:01
Show Gist options
  • Save scho/7789e73c873d44dd6895113895cfc606 to your computer and use it in GitHub Desktop.
Save scho/7789e73c873d44dd6895113895cfc606 to your computer and use it in GitHub Desktop.
using System;
using System.Linq;
using UniRx;
public class CollectionObservable<T, TValue> : IObservable<TValue> where T : class
{
private readonly IReadOnlyReactiveCollection<T> _collection;
private readonly Func<T, IObservable<TValue>> _observableFunc;
private readonly Subject<TValue> _subject;
private readonly Disposables _itemDisposables;
private readonly CompositeDisposable _collectionDisposables;
public CollectionObservable(IReadOnlyReactiveCollection<T> collection,
Func<T, IObservable<TValue>> observableFunc)
{
_collection = collection;
_observableFunc = observableFunc;
_collectionDisposables = new CompositeDisposable();
_itemDisposables = new Disposables();
_subject = new Subject<TValue>();
}
public IDisposable Subscribe(IObserver<TValue> observer)
{
try
{
return new DisposableSubject<TValue>(_subject,
new CompositeDisposable {_collectionDisposables, _itemDisposables}).Subscribe(observer);
}
finally
{
SubscribeCollectionItems();
SubscribeToCollectionChanges();
}
}
private void SubscribeCollectionItems()
{
_collection.Select(_observableFunc).ForEach(observable =>
_itemDisposables.Add(observable.Subscribe(value => _subject.OnNext(value))));
}
private void SubscribeToCollectionChanges()
{
_collection.ObserveAdd().Subscribe(add =>
_itemDisposables.Insert(add.Index,
_observableFunc(add.Value).Subscribe(value => _subject.OnNext(value))));
_collection.ObserveRemove().Subscribe(remove => _itemDisposables.DisposeAt(remove.Index))
.AddTo(_collectionDisposables);
_collection.ObserveReplace().Subscribe(replace =>
{
_itemDisposables.DisposeAt(replace.Index);
_itemDisposables.Insert(replace.Index,
_observableFunc(replace.NewValue).Subscribe(value => _subject.OnNext(value)));
}).AddTo(_collectionDisposables);
_collection.ObserveReset().Subscribe(reset =>
{
_itemDisposables.Dispose();
SubscribeCollectionItems();
}).AddTo(_collectionDisposables);
}
}
using System;
using UniRx;
public class DisposableSubject<TValue> : IObservable<TValue>
{
private readonly ISubject<TValue> _subject;
private readonly IDisposable _outerDisposable;
public DisposableSubject(ISubject<TValue> subject, IDisposable outerDisposable)
{
_subject = subject;
_outerDisposable = outerDisposable;
}
public IDisposable Subscribe(IObserver<TValue> observer)
{
var disposables = new CompositeDisposable {_outerDisposable};
_subject.Subscribe(observer).AddTo(disposables);
return disposables;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment