Skip to content

Instantly share code, notes, and snippets.

@Piasy
Last active October 25, 2016 10:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Piasy/613ded2cee703b853464acd1645c7687 to your computer and use it in GitHub Desktop.
Save Piasy/613ded2cee703b853464acd1645c7687 to your computer and use it in GitHub Desktop.
Demonstrate ValueRequestOperator
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import rx.Emitter;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;
public class ProducerTest {
private static Observable<Long> source() {
return Observable.fromEmitter(emitter -> {
new Thread(() -> {
while (true) {
long value = System.currentTimeMillis();
emitter.onNext(value);
sleep(10);
}
}, "source thread").start();
}, Emitter.BackpressureMode.DROP);
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
// ignore
}
}
private static void log(String message) {
System.out.println(message
+ " @ "
+ Thread.currentThread().getName()
+ ", "
+ System.currentTimeMillis());
}
@Test
public void test() {
final Random random = new Random(System.currentTimeMillis());
Subscription subscription = source()
.observeOn(Schedulers.computation(), 1)
.map(value -> {
sleep(50 + random.nextInt(50)); // simulate heavy job
if (value % 2 == 0) {
return String.valueOf(value);
} else {
return "";
}
})
.filter(str -> str.length() > 0)
.observeOn(Schedulers.io(), 1)
.lift(new ValueRequestOperator<>())
.subscribe(new Subscriber<String>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
log("final subscriber got " + s);
Observable.timer(1, TimeUnit.SECONDS)
.subscribe(l -> {
log("request another");
request(1);
});
}
});
sleep(10_000);
subscription.unsubscribe();
}
static class ValueRequestOperator<T> implements Observable.Operator<T, T> {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ValueRequestProducer<T> producer = new ValueRequestProducer<>(child);
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T s) {
producer.setValue(s);
}
};
child.setProducer(producer);
return subscriber;
}
}
static class ValueRequestProducer<T> implements Producer {
private static final int NO_REQ_NO_VALUE = 1;
private static final int NO_REQ_HAS_VALUE = 2;
private static final int HAS_REQ_NO_VALUE = 3;
private final AtomicInteger mState = new AtomicInteger(NO_REQ_NO_VALUE);
private final AtomicReference<T> mValue = new AtomicReference<>();
private final Subscriber<? super T> mChild;
ValueRequestProducer(Subscriber<? super T> child) {
mChild = child;
}
public void setValue(T value) {
while (true) {
int state = mState.get();
if (state == NO_REQ_NO_VALUE) {
mValue.set(value);
if (!mState.compareAndSet(NO_REQ_NO_VALUE, NO_REQ_HAS_VALUE)) {
continue;
}
} else if (state == HAS_REQ_NO_VALUE) {
if (mState.compareAndSet(HAS_REQ_NO_VALUE, NO_REQ_NO_VALUE)) {
if (!mChild.isUnsubscribed()) {
mChild.onNext(value);
mValue.set(null);
}
}
} else if (state == NO_REQ_HAS_VALUE) {
mValue.set(value);
}
return;
}
}
@Override
public void request(long n) {
if (n == 0) {
return;
}
if (n < 0) {
throw new IllegalStateException("Request can't be negative! " + n);
}
while (true) {
int state = mState.get();
if (state == NO_REQ_NO_VALUE) {
if (!mState.compareAndSet(NO_REQ_NO_VALUE, HAS_REQ_NO_VALUE)) {
continue;
}
} else if (state == NO_REQ_HAS_VALUE) {
if (mState.compareAndSet(NO_REQ_HAS_VALUE, NO_REQ_NO_VALUE)) {
if (!mChild.isUnsubscribed()) {
T value = mValue.getAndSet(null);
if (value != null && !mChild.isUnsubscribed()) {
mChild.onNext(value);
}
}
}
}
return;
}
}
}
}
@Piasy
Copy link
Author

Piasy commented Oct 25, 2016

The problem:

We have a data source, which will keep emitting values in a separate thread, then we need to process the emitted value, which will take some time, during the process, we may or may not send a value to downstream. At the subscriber, we need to show a dialog to the user, when the dialog is showing, no event should be sent to the subscriber.

The code above simulate this process, we use the source() function to generate values, and use a map() operator to simulate to process, then we use filter() to filter out illegal values, at last we lift() the ValueRequestOperator on it. Besides, we may need to switch executing thread of different steps, doing the process in computation thread, and responding to the value in main thread (here I use io in order to run the test in JVM). At last, we use a "1 second" later request to simulate user's action on the dialog.

The ValueRequestOperator works like the SingleDelayedProducer in http://akarnokd.blogspot.com/2015/05/operator-concurrency-primitives_86.html, except its state machine.

So I have two questions:

  1. In such use case, does implementing such an operator is a good way?
  2. Does the implementation of ValueRequestOperator correct?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment