Skip to content

Instantly share code, notes, and snippets.

@eeichinger
Created May 20, 2015 05:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eeichinger/b5c9edd10e77b7b8bc62 to your computer and use it in GitHub Desktop.
Save eeichinger/b5c9edd10e77b7b8bc62 to your computer and use it in GitHub Desktop.
an rxjava catch-like operator
package myrx;
import org.junit.Test;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Func1;
import rx.plugins.RxJavaPlugins;
import static myrx.CatchOperatorTest.OperatorCatch.onException;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static rx.Observable.just;
/**
* @author Erich Eichinger
* @since 19/05/2015
*/
public class CatchOperatorTest {
@Test
public void should_be_2() {
final String xIn = "1";
int x = calc(xIn, Integer::parseInt, (i) -> true).toBlocking().first();
assertThat(x, equalTo(2));
}
@Test
public void should_be_1() {
final String xIn = "1";
int x = calc(xIn, Integer::parseInt, (i) -> false).toBlocking().firstOrDefault(1);
assertThat(x, equalTo(1));
}
@Test
public void should_catch_exception() {
final String xIn = "1";
Observable<Integer> calc = calc(xIn, this::throwError, (i) -> true);
calc = calc.lift(onException(MyException.class, (e) -> Integer.parseInt(e.getVal())));
int x = calc.toBlocking().single();
assertThat(x, equalTo(1));
}
@Test(expected = OtherException.class)
public void should_not_catch_exception() {
final String xIn = "1";
Observable<Integer> calc = calc(
xIn
, (s) -> {
throw new OtherException("other");
}
, (i) -> true)
.lift(onException(MyException.class, (e) -> Integer.parseInt(e.getVal())));
calc.toBlocking().single();
}
@Test(expected = OtherException.class)
public void should_rethrow_exception() {
final String xIn = "1";
Observable<Integer> calc = calc(xIn, (s) -> {
throw new MyException("mine");
}, (i) -> true);
calc = calc.lift(onException(MyException.class, (e) -> {
throw new OtherException(e.getVal());
}));
calc.toBlocking().single();
}
@Test
public void should_catch_multiple_exception() {
final String xIn = "1";
Observable<Integer> calc = calc(xIn, (s) -> {
throw new OtherException("other");
}, (i) -> true);
calc = calc
.lift(onException(MyException.class, (e) -> 4))
.lift(onException(OtherException.class, (e) -> 5));
int res = calc.toBlocking().single();
assertEquals(5, res);
}
private <T1, R> R throwError(T1 in) {
throw new MyException(in);
}
private Observable<Integer> calc(String xIn, Func1<String, Integer> parse, Func1<Integer, Boolean> predicate) {
return just(xIn)
.map(parse)
.filter(predicate)
.map(x -> x * 2);
}
static class MyException extends RuntimeException {
private final Object val;
public MyException(Object val) {
this.val = val;
}
@SuppressWarnings("unchecked")
public <T> T getVal() {
return (T) val;
}
}
static class OtherException extends RuntimeException {
private final Object val;
public OtherException(Object val) {
this.val = val;
}
@SuppressWarnings("unchecked")
public <T> T getVal() {
return (T) val;
}
}
public static final class OperatorCatch<T, E extends Throwable> implements Observable.Operator<T, T> {
final Class<E> exceptionClazz;
final Func1<E, ? extends T> resultFunction;
public static <T, E extends Throwable> OperatorCatch<T, E> onException(
Class<E> eClass,
Func1<E, ? extends T> resumeFunction
) {
return new OperatorCatch<T, E>(eClass, resumeFunction);
}
public OperatorCatch(Class<E> exceptionClazz, Func1<E, ? extends T> resultFunction) {
this.exceptionClazz = exceptionClazz;
this.resultFunction = resultFunction;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
Subscriber<T> parent = new Subscriber<T>() {
private boolean done = false;
@Override
public void onNext(T t) {
if (done) {
return;
}
child.onNext(t);
}
@Override
public void onError(Throwable e) {
if (done) {
Exceptions.throwIfFatal(e);
return;
}
done = true;
if (!exceptionClazz.isInstance(e)) {
child.onError(e);
return;
}
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
unsubscribe();
T result = resultFunction.call((E) e);
child.onNext(result);
} catch (Throwable x) {
child.onError(x);
return;
}
child.onCompleted();
}
@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
child.onCompleted();
}
@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {
@Override
public void request(long n) {
producer.request(n);
}
});
}
};
child.add(parent);
return parent;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment