Skip to content

Instantly share code, notes, and snippets.

@alexwen
Forked from gjesse/TestPredicateWindow.java
Last active January 5, 2016 19:23
Show Gist options
  • Save alexwen/3a6f446ca4bd557d3a09 to your computer and use it in GitHub Desktop.
Save alexwen/3a6f446ca4bd557d3a09 to your computer and use it in GitHub Desktop.
example of using distinctUntilChanged and window to segment an ordered observable into discrete buckets.
package net.loshodges;
import org.junit.Test;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThat;
public class TestPredicateWindow {
private static final long PERCENT_KEEP = 75L;
private static final int MAX_ITEMS = 100000;
@Test
public void testPredicateWindow() throws InterruptedException {
/**
* With an ordered input observable, we want to segment the stream into
* discrete buckets based on a value of the objects themselves (time in real life)
* we know that the stream is ordered, but we cannot guarantee to have a datapoint
* for every possible value in the stream, thus we cannot use the {@link Observable#window(int)}
* methods. The time based window methods will not work either, as we may not be processing these
* in real time.
*/
final Observable<Long> source = Observable.range(0, MAX_ITEMS, Schedulers.computation())
.map(Long::valueOf)
.filter(this::random);
// publish is nice short-hand for the 'shared' pattern with window
final Observable<List<Long>> ranges = source
.publish(s -> {
// skip the first as it is empty
final Observable<Long> boundary = s.distinctUntilChanged(i -> i / 10).skip(1);
return s.window(boundary).flatMap(Observable::toList);
});
final CountDownLatch done = new CountDownLatch(1);
final TestSubscriber<List<Long>> sub = new TestSubscriber<>();
ranges.doOnNext(System.out::println).doOnCompleted(done::countDown).subscribe(sub);
done.await();
sub.assertNoErrors();
sub.getOnNextEvents().forEach(l -> assertThat(l, hasSize(lessThanOrEqualTo(10))));
System.out.println(sub.getOnNextEvents().size() + " windows from " + MAX_ITEMS);
}
private Boolean random(Long aLong) {
return ThreadLocalRandom.current().nextLong(100) < PERCENT_KEEP;
}
}
@gjesse
Copy link

gjesse commented Jan 5, 2016

So I realized the problem with this (and my other implementation) is that it relies on the next onNext item to close the previous window, introducing a delay that's no bueno for lower rate observables. I think I should be able to modify the example i Had (https://gist.github.com/gjesse/72a478e5b8248861dd2c) to do this w/out too much trouble but i don't see a way to do it here unless there is a delay instituted, which in turn breaks for continuous observables

final Observable<Long> boundary = s.distinctUntilChanged(i -> i / 10).skip(1).delay(10, TimeUnit.MILLISECONDS);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment