Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
class CacheObservable<T> : OperatorObservableBase<IList<T>>
{
readonly UniRx.IObservable<T> source;
readonly int count;
public CacheObservable(UniRx.IObservable<T> source, int count)
: base(source.IsRequiredSubscribeOnCurrentThread())
{
this.source = source;
this.count = count;
}
protected override IDisposable SubscribeCore(UniRx.IObserver<IList<T>> observer, IDisposable cancel)
{
return new CacheImpl(this, observer, cancel).Run();
}
// count only
class CacheImpl : OperatorObserverBase<T, IList<T>>
{
readonly CacheObservable<T> parent;
List<T> list;
public CacheImpl(CacheObservable<T> parent, UniRx.IObserver<IList<T>> observer, IDisposable cancel) : base(
observer, cancel)
{
this.parent = parent;
}
public IDisposable Run()
{
list = new List<T>(parent.count);
return parent.source.Subscribe(this);
}
public override void OnNext(T value)
{
list.Add(value);
if (list.Count == parent.count)
{
observer.OnNext(list);
list.RemoveAt(0);
}
}
public override void OnError(Exception error)
{
try
{
observer.OnError(error);
}
finally
{
Dispose();
}
}
public override void OnCompleted()
{
if (list.Count > 0)
{
observer.OnNext(list);
}
try
{
observer.OnCompleted();
}
finally
{
Dispose();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment