Skip to content

Instantly share code, notes, and snippets.

@jcfandino
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jcfandino/fd47277ada821f51a9d4 to your computer and use it in GitHub Desktop.
Save jcfandino/fd47277ada821f51a9d4 to your computer and use it in GitHub Desktop.
RxJava batches, window vs. buffer
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
public class RxJavaTest {
private final class UpdateCountdowns implements Action1<Observable<String>> {
String name;
protected UpdateCountdowns(String aName) {
name = aName;
}
@Override
public void call(Observable<String> w) {
batchesCount.countDown();
w.subscribe(new Action1<String>() {
public void call(String s) {
System.out.println(name + " - " + hashCode() + ": " + s);
itemsCount.countDown();
}
});
}
}
@Before
public void setUp() throws Exception {
}
Observable<String> observable = Observable.from("1", "2", "3", "4", "5");
final CountDownLatch itemsCount = new CountDownLatch(5);
final CountDownLatch batchesCount = new CountDownLatch(3);
Func1<String, String> delayOn4 = new Func1<String, String>() {
public String call(String s) {
if ("4".equals(s)) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return s;
}
};
@Test
public void testBatchingUsingWindow() throws InterruptedException {
observable.map(delayOn4).window(100, TimeUnit.MILLISECONDS, 5)
.subscribe(new UpdateCountdowns("window"));
assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));
}
@Test
public void testBatchingUsingBuffer() throws InterruptedException {
observable.map(delayOn4).buffer(10, TimeUnit.MILLISECONDS, 2)
.map(new Func1<List<String>, Observable<String>>() {
public Observable<String> call(List<String> l) {
return Observable.from(l);
}
}).subscribe(new UpdateCountdowns("buffer"));
assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment