Skip to content

Instantly share code, notes, and snippets.

@YannRobert
Created November 19, 2014 18:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save YannRobert/972b95dcf1bae66791bc to your computer and use it in GitHub Desktop.
Save YannRobert/972b95dcf1bae66791bc to your computer and use it in GitHub Desktop.
RxJava Test demonstrating an issue (duplicate emitions) when using delay(duration) in combination with buffer(count, duration)
package rxjava.issue;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BufferMayProduceDuplicatesTest {
private final static Logger log = LoggerFactory.getLogger(BufferMayProduceDuplicatesTest.class);
@Test // succeed
public void bufferIsWorkingWithoutDelay() {
int itemCount = 1;
int bufferSize = itemCount + 1;
Observable.range(1, itemCount).buffer(bufferSize, 3, TimeUnit.SECONDS).asObservable().toBlocking().forEach(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> values) {
log.info("values = {}", values);
Assert.assertEquals(1, values.size());
}
});
}
@Test // fails
public void issueHere() {
final AtomicInteger count = new AtomicInteger(0);
Observable.range(1, 1).delay(10, TimeUnit.SECONDS).buffer(10, 3, TimeUnit.SECONDS).asObservable().toBlocking().forEach(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> values) {
log.info("values = {}", values);
count.addAndGet(values.size());
}
});
Assert.assertEquals(1, count.get()); // Actual = 4
}
@Test // succeed
public void shouldNotEmitDuplicatesWhenDelayIsZero() {
int itemCount = 1;
int bufferDuration = 10;
int delayDuration = 0;
doTheThing(itemCount, delayDuration, bufferDuration);
}
@Test // fails (actual = 4)
public void shouldNotEmitDuplicatesWhenDelayIs3TimesHigherThanBufferDuration() {
int itemCount = 1;
int bufferDuration = 3;
int delayDuration = bufferDuration * 3;
doTheThing(itemCount, delayDuration, bufferDuration);
}
@Test // fails (actual = 3)
public void shouldNotEmitDuplicatesWhenDelayIs2TimesHigherThanBufferDuration() {
int itemCount = 1;
int bufferDuration = 3;
int delayDuration = bufferDuration * 2;
doTheThing(itemCount, delayDuration, bufferDuration);
}
@Test // succeed
public void shouldNotEmitDuplicatesWhenDelayIs3TimesLowerThanBufferDuration() {
int itemCount = 1;
int bufferDuration = 3;
int delayDuration = bufferDuration / 3;
doTheThing(itemCount, delayDuration, bufferDuration);
}
@Test // succeed
public void shouldNotEmitDuplicatesWhenDelayIs2TimesLowerThanBufferDuration() {
int itemCount = 1;
int bufferDuration = 4;
int delayDuration = bufferDuration / 2;
doTheThing(itemCount, delayDuration, bufferDuration);
}
private void doTheThing(final int itemCount, final int delayDuration, final long bufferDuration) {
final int bufferSize = itemCount * 10;
final AtomicInteger count = new AtomicInteger(0);
Observable.range(1, itemCount).delay(delayDuration, TimeUnit.SECONDS).buffer(bufferSize, bufferDuration, TimeUnit.SECONDS).asObservable().toBlocking().forEach(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> values) {
log.info("values = {}", values);
count.addAndGet(values.size());
}
});
Assert.assertEquals(itemCount, count.get());
}
}
@YannRobert
Copy link
Author

referenced by ReactiveX/RxJava#1896

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment