Skip to content

Instantly share code, notes, and snippets.

@JvmName
Last active April 12, 2018 05:45
Show Gist options
  • Save JvmName/cef01863de4e0948ab63875555f8a4a0 to your computer and use it in GitHub Desktop.
Save JvmName/cef01863de4e0948ab63875555f8a4a0 to your computer and use it in GitHub Desktop.
A quick-n-dirty hybrid of RxJava's Map + Filter operators
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.plugins.RxJavaPlugins;
public class ObservableFilterMap<In, Out> extends Observable<Out> {
private final Function<In, Out> mapper;
private final Predicate<In> filter;
public ObservableFilterMap(Predicate<In> filter, Function<In, Out> mapper) {
this.filter = filter;
this.mapper = mapper;
}
@Override
protected void subscribeActual(Observer<? super Out> observer) {
new MapFilterObserver<>(observer, filter, mapper);
}
static final class MapFilterObserver<In, Out> implements Observer<In>, Disposable{
Disposable s;
private final Observer<? super Out> downstream;
private final Predicate<In> filter;
private final Function<In, Out> mapper;
public MapFilterObserver(Observer<? super Out> downstream, Predicate<In> filter, Function<In, Out> mapper) {
this.downstream = downstream;
this.filter = filter;
this.mapper = mapper;
}
@Override
public void onSubscribe(@NonNull Disposable d) {
if (DisposableHelper.validate(this.s, d)) {
this.s = d;
downstream.onSubscribe(this);
}
}
@Override
public void onNext(@NonNull In t) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
Out v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
}
@Override
public void onError(@NonNull Throwable e) {
downstream.onError(e);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
s.dispose();
}
@Override
public boolean isDisposed() {
return s.isDisposed();
}
protected final void fail(Throwable t) {
Exceptions.throwIfFatal(t);
s.dispose();
onError(t);
}
}
}
@JvmName
Copy link
Author

JvmName commented Apr 12, 2018

thanks to @artem-zinnatullin for the suggestions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment