Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active Mar 23, 2021
Embed
What would you like to do?
static <T, R> Publisher<R> map(Publisher<T> upstream, Function<T, R> mapper) {
return downstream -> upstream.subscribe(new Mapper<>(downstream, mapper));
}
class Mapper<T, R> implements Subscriber<T> {
final Subscriber<? super R> downstream;
final Function<T, R> mapper;
Mapper(Subscriber<? super R> downstream, Function<T, R> mapper) {
this.downstream = downstream;
this.mapper = mapper;
}
@Override
public void onSubscribe(Subscription s) {
downstream.onSubscribe(s);
}
@Override
public void onNext(T item) {
downstream.onNext(mapper.apply(item));
}
@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}
@Override
public void onComplete() {
downstream.onComplete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment