Skip to content

Instantly share code, notes, and snippets.

@zoetrope
Created July 26, 2016 10:16
Show Gist options
  • Save zoetrope/fad1071c471f8092d179a7b3628c2ef7 to your computer and use it in GitHub Desktop.
Save zoetrope/fad1071c471f8092d179a7b3628c2ef7 to your computer and use it in GitHub Desktop.
import rx.Observable;
import rx.Observer;
import rx.Producer;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.List;
public class BackPressureSample {
public static void main(String[] args) throws Exception {
MyObservable<Integer> subject = MyObservable.create();
MyWorker<Integer> sw = new MyWorker<>();
subject
.observeOn(Schedulers.newThread(), 1) // ここでbufferSizeを渡しておかないと、デフォルトのrequest値が128になる。
.filter(x -> x % 2 == 0)
.subscribe(sw);
subject.onNext(1);
Thread.sleep(500);
subject.onNext(2);
Thread.sleep(500);
subject.onNext(3);
Thread.sleep(500);
subject.onNext(4);
Thread.sleep(2000);
}
}
class MyProducer implements Producer {
private long req = 0;
long getRequest() {
System.out.println("MyProducer getRequest: " + req);
return req;
}
void decrement() {
req--;
System.out.println("MyProducer decrement: " + req);
}
public void request(long n) {
System.out.println("MyProducer request: " + n);
req = n;
}
}
class MyState<T> implements Observable.OnSubscribe<T> {
private final MyProducer producer;
MyState(MyProducer producer) {
this.producer = producer;
this.subscribers = new ArrayList<>();
}
@Override
public void call(Subscriber<? super T> subscriber) {
System.out.println("MyState call");
subscriber.setProducer(producer);
subscribers.add(subscriber);
}
private List<Subscriber<? super T>> subscribers;
void onNext(T x) {
if (producer.getRequest() > 0) {
producer.decrement();
subscribers.forEach(subscriber -> subscriber.onNext(x));
} else {
throw new RuntimeException("ばっくぷれっしゃ!");
}
}
}
class MyObservable<T> extends Observable<T> implements Observer<T> {
static <T> MyObservable<T> create() {
MyProducer producer = new MyProducer();
MyState<T> state = new MyState<>(producer);
return new MyObservable<>(state);
}
private MyState state;
private MyObservable(MyState state) {
super(state);
this.state = state;
}
public void onCompleted() {
System.out.println("MyObservable onCompleted");
}
public void onError(Throwable e) {
System.out.println("MyObservable onError: " + e.getMessage());
}
public void onNext(T x) {
System.out.println("MyObservable onNext: " + x);
this.state.onNext(x);
}
}
class MyWorker<T> extends Subscriber<T> {
@Override
public void onStart() {
System.out.println("MyWorker onStart");
request(1);
}
public void onNext(T o) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("MyWorker next: " + o);
// request(1);
}
public void onCompleted() {
System.out.println("MyWorker onCompleted");
}
public void onError(Throwable e) {
System.out.println("MyWorker error: " + e.getMessage());
e.printStackTrace();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment