Last active
August 29, 2015 14:09
-
-
Save YannRobert/32921e8e88c62c5af1f9 to your computer and use it in GitHub Desktop.
shows that the usage of 2 RxJava buffers with both a timespan and a count will eventually make the events be emitted at a very slow rate to the subscriber
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rxjava.issue; | |
import org.junit.Assert; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Subscriber; | |
import rx.functions.Func1; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* This test shows that the usage of 2 RxJava buffers with both a timespan and a count will eventually make the events be emitted at a very slow rate to the subscriber. | |
* The observed rate will be as slow as 1 item per whatever is the second buffer timespan. | |
* | |
* @author Yann Robert <yann.robert@anantaplex.fr> | |
*/ | |
public class RxJavaBufferWithTimespanBugTest { | |
private final static Logger log = LoggerFactory.getLogger(RxJavaBufferWithTimespanBugTest.class); | |
private static final int FIRST_BUFFER_MAXSIZE = 100; | |
private static final int SECOND_BUFFER_MAXSIZE = 200; | |
private static final long FIRST_BUFFER_TIMESPAN_IN_SECONDS = 2; | |
private static final long SECOND_BUFFER_TIMESPAN_IN_SECONDS = 3; | |
@Test(timeout = 60 * 1000) | |
public void showUsageOfTwoBuffersWithTimespanIsDegeneratingAfterAWhile() { | |
final AtomicInteger originalObservableIndex = new AtomicInteger(0); | |
final AtomicBoolean onCompletedCalled = new AtomicBoolean(false); | |
final int MAX_INDEX = 10 * 1000; | |
// this is an Observable emitting a lot of values at a very high rate | |
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { | |
@Override | |
public void call(Subscriber<? super Integer> subscriber) { | |
try { | |
while (originalObservableIndex.get() < MAX_INDEX) { | |
subscriber.onNext(originalObservableIndex.incrementAndGet()); | |
try { | |
Thread.sleep(3); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
subscriber.onCompleted(); | |
} catch (RuntimeException e) { | |
subscriber.onError(e); | |
throw e; | |
} | |
} | |
}); | |
observable.buffer(FIRST_BUFFER_TIMESPAN_IN_SECONDS, TimeUnit.SECONDS, FIRST_BUFFER_MAXSIZE).flatMap(new Func1<List<Integer>, Observable<Integer>>() { | |
@Override | |
public Observable<Integer> call(List<Integer> integers) { | |
log.info("first buffer size = " + integers.size()); | |
return Observable.from(integers); | |
} | |
}).buffer(SECOND_BUFFER_TIMESPAN_IN_SECONDS, TimeUnit.SECONDS, SECOND_BUFFER_MAXSIZE).flatMap(new Func1<List<Integer>, Observable<Integer>>() { | |
@Override | |
public Observable<Integer> call(List<Integer> integers) { | |
log.info("second buffer size = " + integers.size()); | |
if (integers.size() == 1) { | |
// originalObservableIndex continues to increment very fast, so the subscribers are getting later and later | |
log.error("There is a problem now, the second buffer contains only 1 item and originalObservableIndex = " + originalObservableIndex.get()); | |
} | |
return Observable.from(integers); | |
} | |
}).subscribe(new Observer<Integer>() { | |
private long lastItemNotificationTime = System.currentTimeMillis(); | |
private final long warningThreshold = Math.min(FIRST_BUFFER_TIMESPAN_IN_SECONDS, SECOND_BUFFER_TIMESPAN_IN_SECONDS) * 1000 / 2; | |
@Override | |
public void onCompleted() { | |
onCompletedCalled.set(true); | |
log.error("onCompleted()"); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
log.error("onError : " + throwable.getClass().getName() + " : " + throwable.getMessage(), throwable); | |
if (throwable instanceof RuntimeException) { | |
throw (RuntimeException) throwable; | |
} else { | |
throw new RuntimeException(throwable); | |
} | |
} | |
@Override | |
public void onNext(Integer value) { | |
long notificationTime = System.currentTimeMillis(); | |
log.info("value = " + value); | |
long timeSinceLastNotification = notificationTime - lastItemNotificationTime; | |
if (timeSinceLastNotification > warningThreshold) { | |
log.warn("Last notification was a long time ago : timeSinceLastNotification = " + timeSinceLastNotification + " ms"); | |
} | |
lastItemNotificationTime = notificationTime; | |
} | |
}); | |
Assert.assertTrue(onCompletedCalled.get()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
referenced by ReactiveX/RxJava#1867