Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
public class BossObservable<T> : OperatorObservableBase<T>
{
readonly IObservable<T> source;
public BossObservable(
IObservable<T> source)
: base(source.IsRequiredSubscribeOnCurrentThread())
{
this.source = source;
}
protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
{
return source.Subscribe(new Boss(observer, cancel));
}
class Boss : OperatorObserverBase<T, T>
{
public Boss(
IObserver<T> observer,
IDisposable cancel) : base(observer, cancel)
{
Debug.Log("デキタヨ!");
}
public override void OnNext(T value)
{
Debug.Log("トオッタヨ!");
base.observer.OnNext(value);
}
public override void OnError(Exception error)
{
Debug.Log("アワワワワワ");
try
{
observer.OnError(error);
}
finally { Dispose(); }
}
public override void OnCompleted()
{
Debug.Log("ボクニマカセテ!");
try
{
observer.OnCompleted();
}
finally { Dispose(); }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment