Skip to content

Instantly share code, notes, and snippets.

@natdempk
Created July 13, 2017 02:01
Show Gist options
  • Save natdempk/b13f5ab0a6e98f8063e3889ac40f04b0 to your computer and use it in GitHub Desktop.
Save natdempk/b13f5ab0a6e98f8063e3889ac40f04b0 to your computer and use it in GitHub Desktop.
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class TestMain {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Long count = Flowable.fromIterable(() -> ints.iterator())
.onBackpressureBuffer(10)
.buffer(10)
.observeOn(Schedulers.computation())
.flatMap(buf -> {
System.out.println("Got batch of events");
return Flowable.fromIterable(buf);
})
.map(x -> x + 1)
.observeOn(Schedulers.io())
.doOnNext(i -> {
System.out.println(String.format("Sleeping : %d", i));
Thread.sleep(100);
System.out.println(i);
})
.observeOn(Schedulers.computation())
.count()
.blockingGet();
System.out.println("count: " + count);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment