Skip to content

Instantly share code, notes, and snippets.

@hanbei
Created December 20, 2017 09:52
Show Gist options
  • Save hanbei/6d3bd66fd4fdbd179628bc2122f0e369 to your computer and use it in GitHub Desktop.
Save hanbei/6d3bd66fd4fdbd179628bc2122f0e369 to your computer and use it in GitHub Desktop.
package rxsearcher.searcher.kelkoo;
import com.google.common.util.concurrent.Uninterruptibles;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class UndeliverableException {
@Test
public void undeliverable() {
// RxJavaPlugins.setErrorHandler(t -> System.out.println(t.getMessage()));
Flowable<Integer> flowable = Flowable.create(subscriber -> {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
subscriber.onNext(i);
}
subscriber.onError(new RuntimeException("Error"));
}).start();
}, BackpressureStrategy.MISSING);
Disposable subscribe = flowable
.subscribe(
System.out::print,
Throwable::printStackTrace
);
Uninterruptibles.sleepUninterruptibly(350, TimeUnit.MILLISECONDS);
subscribe.dispose();
Uninterruptibles.sleepUninterruptibly(11 * 300, TimeUnit.MILLISECONDS);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment