Skip to content

Instantly share code, notes, and snippets.

@gjesse
Created November 21, 2015 15:46
Show Gist options
  • Save gjesse/8d1d06921733cdf87a6e to your computer and use it in GitHub Desktop.
Save gjesse/8d1d06921733cdf87a6e to your computer and use it in GitHub Desktop.
example of using distinctUntilChanged and window to segment an ordered observable into discrete buckets.
package net.loshodges;
import com.google.common.collect.Lists;
import org.junit.Test;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
public class TestPredicateWindow {
private static final long PERCENT_KEEP = 75L;
private static final int MAX_ITEMS = 100000;
@Test
public void testPredicateWindow() throws InterruptedException {
/**
* With an ordered input observable, we want to segment the stream into
* discrete buckets based on a value of the objects themselves (time in real life)
* we know that the stream is ordered, but we cannot guarantee to have a datapoint
* for every possible value in the stream, thus we cannot use the {@link Observable#window(int)}
* methods. The time based window methods will not work either, as we may not be processing these
* in real time.
*/
Observable<Long> source = Observable.range(0, MAX_ITEMS, Schedulers.computation())
.map(Long::valueOf)
.filter(this::random);
Observable<Long> shared = source.share();
CountDownLatch done = new CountDownLatch(1);
List<TestSubscriber<List<Long>>> subs = Lists.newCopyOnWriteArrayList();
Observable<Long> boundary = shared.distinctUntilChanged(i -> i / 10);
shared.window(boundary).subscribe(window -> {
TestSubscriber<List<Long>> sub = new TestSubscriber<>();
window.toList().subscribe(sub);
subs.add(sub);
}, System.out::println, done::countDown);
done.await();
subs.forEach(sub -> {
sub.awaitTerminalEvent();
sub.assertValueCount(1);
assertThat(sub.getOnNextEvents().get(0).size()).isLessThanOrEqualTo(10);
System.out.println(sub.getOnNextEvents());
});
System.out.println(subs.size() + " windows from " + MAX_ITEMS);
}
private Boolean random(Long aLong) {
return ThreadLocalRandom.current().nextLong(100) < PERCENT_KEEP;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment