Created
May 20, 2015 05:56
-
-
Save eeichinger/b5c9edd10e77b7b8bc62 to your computer and use it in GitHub Desktop.
an rxjava catch-like operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package myrx; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.Producer; | |
import rx.Subscriber; | |
import rx.exceptions.Exceptions; | |
import rx.functions.Func1; | |
import rx.plugins.RxJavaPlugins; | |
import static myrx.CatchOperatorTest.OperatorCatch.onException; | |
import static org.hamcrest.Matchers.*; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertThat; | |
import static rx.Observable.just; | |
/** | |
* @author Erich Eichinger | |
* @since 19/05/2015 | |
*/ | |
public class CatchOperatorTest { | |
@Test | |
public void should_be_2() { | |
final String xIn = "1"; | |
int x = calc(xIn, Integer::parseInt, (i) -> true).toBlocking().first(); | |
assertThat(x, equalTo(2)); | |
} | |
@Test | |
public void should_be_1() { | |
final String xIn = "1"; | |
int x = calc(xIn, Integer::parseInt, (i) -> false).toBlocking().firstOrDefault(1); | |
assertThat(x, equalTo(1)); | |
} | |
@Test | |
public void should_catch_exception() { | |
final String xIn = "1"; | |
Observable<Integer> calc = calc(xIn, this::throwError, (i) -> true); | |
calc = calc.lift(onException(MyException.class, (e) -> Integer.parseInt(e.getVal()))); | |
int x = calc.toBlocking().single(); | |
assertThat(x, equalTo(1)); | |
} | |
@Test(expected = OtherException.class) | |
public void should_not_catch_exception() { | |
final String xIn = "1"; | |
Observable<Integer> calc = calc( | |
xIn | |
, (s) -> { | |
throw new OtherException("other"); | |
} | |
, (i) -> true) | |
.lift(onException(MyException.class, (e) -> Integer.parseInt(e.getVal()))); | |
calc.toBlocking().single(); | |
} | |
@Test(expected = OtherException.class) | |
public void should_rethrow_exception() { | |
final String xIn = "1"; | |
Observable<Integer> calc = calc(xIn, (s) -> { | |
throw new MyException("mine"); | |
}, (i) -> true); | |
calc = calc.lift(onException(MyException.class, (e) -> { | |
throw new OtherException(e.getVal()); | |
})); | |
calc.toBlocking().single(); | |
} | |
@Test | |
public void should_catch_multiple_exception() { | |
final String xIn = "1"; | |
Observable<Integer> calc = calc(xIn, (s) -> { | |
throw new OtherException("other"); | |
}, (i) -> true); | |
calc = calc | |
.lift(onException(MyException.class, (e) -> 4)) | |
.lift(onException(OtherException.class, (e) -> 5)); | |
int res = calc.toBlocking().single(); | |
assertEquals(5, res); | |
} | |
private <T1, R> R throwError(T1 in) { | |
throw new MyException(in); | |
} | |
private Observable<Integer> calc(String xIn, Func1<String, Integer> parse, Func1<Integer, Boolean> predicate) { | |
return just(xIn) | |
.map(parse) | |
.filter(predicate) | |
.map(x -> x * 2); | |
} | |
static class MyException extends RuntimeException { | |
private final Object val; | |
public MyException(Object val) { | |
this.val = val; | |
} | |
@SuppressWarnings("unchecked") | |
public <T> T getVal() { | |
return (T) val; | |
} | |
} | |
static class OtherException extends RuntimeException { | |
private final Object val; | |
public OtherException(Object val) { | |
this.val = val; | |
} | |
@SuppressWarnings("unchecked") | |
public <T> T getVal() { | |
return (T) val; | |
} | |
} | |
public static final class OperatorCatch<T, E extends Throwable> implements Observable.Operator<T, T> { | |
final Class<E> exceptionClazz; | |
final Func1<E, ? extends T> resultFunction; | |
public static <T, E extends Throwable> OperatorCatch<T, E> onException( | |
Class<E> eClass, | |
Func1<E, ? extends T> resumeFunction | |
) { | |
return new OperatorCatch<T, E>(eClass, resumeFunction); | |
} | |
public OperatorCatch(Class<E> exceptionClazz, Func1<E, ? extends T> resultFunction) { | |
this.exceptionClazz = exceptionClazz; | |
this.resultFunction = resultFunction; | |
} | |
@Override | |
public Subscriber<? super T> call(final Subscriber<? super T> child) { | |
Subscriber<T> parent = new Subscriber<T>() { | |
private boolean done = false; | |
@Override | |
public void onNext(T t) { | |
if (done) { | |
return; | |
} | |
child.onNext(t); | |
} | |
@Override | |
public void onError(Throwable e) { | |
if (done) { | |
Exceptions.throwIfFatal(e); | |
return; | |
} | |
done = true; | |
if (!exceptionClazz.isInstance(e)) { | |
child.onError(e); | |
return; | |
} | |
try { | |
RxJavaPlugins.getInstance().getErrorHandler().handleError(e); | |
unsubscribe(); | |
T result = resultFunction.call((E) e); | |
child.onNext(result); | |
} catch (Throwable x) { | |
child.onError(x); | |
return; | |
} | |
child.onCompleted(); | |
} | |
@Override | |
public void onCompleted() { | |
if (done) { | |
return; | |
} | |
done = true; | |
child.onCompleted(); | |
} | |
@Override | |
public void setProducer(final Producer producer) { | |
child.setProducer(new Producer() { | |
@Override | |
public void request(long n) { | |
producer.request(n); | |
} | |
}); | |
} | |
}; | |
child.add(parent); | |
return parent; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment