Skip to content

Instantly share code, notes, and snippets.

@YannRobert
Last active August 29, 2015 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save YannRobert/32921e8e88c62c5af1f9 to your computer and use it in GitHub Desktop.
Save YannRobert/32921e8e88c62c5af1f9 to your computer and use it in GitHub Desktop.
shows that the usage of 2 RxJava buffers with both a timespan and a count will eventually make the events be emitted at a very slow rate to the subscriber
package rxjava.issue;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Func1;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This test shows that the usage of 2 RxJava buffers with both a timespan and a count will eventually make the events be emitted at a very slow rate to the subscriber.
* The observed rate will be as slow as 1 item per whatever is the second buffer timespan.
*
* @author Yann Robert <yann.robert@anantaplex.fr>
*/
public class RxJavaBufferWithTimespanBugTest {
private final static Logger log = LoggerFactory.getLogger(RxJavaBufferWithTimespanBugTest.class);
private static final int FIRST_BUFFER_MAXSIZE = 100;
private static final int SECOND_BUFFER_MAXSIZE = 200;
private static final long FIRST_BUFFER_TIMESPAN_IN_SECONDS = 2;
private static final long SECOND_BUFFER_TIMESPAN_IN_SECONDS = 3;
@Test(timeout = 60 * 1000)
public void showUsageOfTwoBuffersWithTimespanIsDegeneratingAfterAWhile() {
final AtomicInteger originalObservableIndex = new AtomicInteger(0);
final AtomicBoolean onCompletedCalled = new AtomicBoolean(false);
final int MAX_INDEX = 10 * 1000;
// this is an Observable emitting a lot of values at a very high rate
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
while (originalObservableIndex.get() < MAX_INDEX) {
subscriber.onNext(originalObservableIndex.incrementAndGet());
try {
Thread.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
subscriber.onCompleted();
} catch (RuntimeException e) {
subscriber.onError(e);
throw e;
}
}
});
observable.buffer(FIRST_BUFFER_TIMESPAN_IN_SECONDS, TimeUnit.SECONDS, FIRST_BUFFER_MAXSIZE).flatMap(new Func1<List<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(List<Integer> integers) {
log.info("first buffer size = " + integers.size());
return Observable.from(integers);
}
}).buffer(SECOND_BUFFER_TIMESPAN_IN_SECONDS, TimeUnit.SECONDS, SECOND_BUFFER_MAXSIZE).flatMap(new Func1<List<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(List<Integer> integers) {
log.info("second buffer size = " + integers.size());
if (integers.size() == 1) {
// originalObservableIndex continues to increment very fast, so the subscribers are getting later and later
log.error("There is a problem now, the second buffer contains only 1 item and originalObservableIndex = " + originalObservableIndex.get());
}
return Observable.from(integers);
}
}).subscribe(new Observer<Integer>() {
private long lastItemNotificationTime = System.currentTimeMillis();
private final long warningThreshold = Math.min(FIRST_BUFFER_TIMESPAN_IN_SECONDS, SECOND_BUFFER_TIMESPAN_IN_SECONDS) * 1000 / 2;
@Override
public void onCompleted() {
onCompletedCalled.set(true);
log.error("onCompleted()");
}
@Override
public void onError(Throwable throwable) {
log.error("onError : " + throwable.getClass().getName() + " : " + throwable.getMessage(), throwable);
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else {
throw new RuntimeException(throwable);
}
}
@Override
public void onNext(Integer value) {
long notificationTime = System.currentTimeMillis();
log.info("value = " + value);
long timeSinceLastNotification = notificationTime - lastItemNotificationTime;
if (timeSinceLastNotification > warningThreshold) {
log.warn("Last notification was a long time ago : timeSinceLastNotification = " + timeSinceLastNotification + " ms");
}
lastItemNotificationTime = notificationTime;
}
});
Assert.assertTrue(onCompletedCalled.get());
}
}
@YannRobert
Copy link
Author

referenced by ReactiveX/RxJava#1867

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment