Skip to content

Instantly share code, notes, and snippets.

@coofee
Created April 7, 2017 11:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save coofee/2c901d12a607a58aaee86a139f1e46af to your computer and use it in GitHub Desktop.
Save coofee/2c901d12a607a58aaee86a139f1e46af to your computer and use it in GitHub Desktop.
rxjava2 Backpressure
package com.coffee.rx.rxjava2;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.ResourceSubscriber;
import java.io.IOException;
/**
* Created by zhaocongying on 17/4/7.
*/
public class BackpressureTest {
public static void main(String[] args) {
question();
System.out.println("start...");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
// 求问一下,为什么rxjava2上述代码中的Subscriber的onNext只能收到128次调用;
// 但是当去掉observeOn()这行代码就能收到全部的数据了?
private static void question() {
Flowable.<String>create(emitter -> {
for (int i = 0; ; i++) {
System.out.println("\nsend: " + i);
emitter.onNext(String.valueOf(i));
try {
Thread.sleep(100);
} catch (Throwable e) {
e.printStackTrace();
}
}
}, BackpressureStrategy.BUFFER)
.map(s -> {
System.out.println("map: " + s);
return s;
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new ResourceSubscriber<String>() {
@Override
public void onNext(String s) {
System.out.println("ResourceSubscriber.onNext: " + s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment