Skip to content

Instantly share code, notes, and snippets.

@JakeWharton
Created June 12, 2014 15:38
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JakeWharton/e133433be99780529476 to your computer and use it in GitHub Desktop.
Save JakeWharton/e133433be99780529476 to your computer and use it in GitHub Desktop.
Map Java 8 streams to RxJava Observables.

Output

Greeting: Hey
Greeting: Hi
Greeting: Hello

[60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
[70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
[80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
[90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
import java.util.Iterator;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import rx.Observable;
import rx.Subscriber;
public class RxStreams<T> extends Observable<T> {
public static <T> RxStreams<T> from(Stream<T> stream) {
return new RxStreams<T>(new OnStreamSubscribe<T>(stream));
}
protected RxStreams(OnStreamSubscribe<T> func) {
super(func);
}
protected static class OnStreamSubscribe<T> implements OnSubscribe<T> {
private final Stream<T> stream;
public OnStreamSubscribe(Stream<T> stream) {
this.stream = stream;
}
@Override public void call(Subscriber<? super T> subscriber) {
for (Iterator<T> i = stream.iterator(); i.hasNext(); ) {
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(i.next());
}
subscriber.onCompleted();
}
}
public static void main(String... args) {
Observable<String> o1 = RxStreams.from(Stream.of("Hey", "Hi", "Hello"));
o1.map(s -> "Greeting: " + s).subscribe(System.out::println);
Observable<String> o2 = RxStreams.from(IntStream.range(0, 100).mapToObj(String::valueOf));
o2.skip(60).buffer(10).subscribe(System.out::println);
}
}
@nottmey
Copy link

nottmey commented Jun 20, 2014

Hello Jake,
isn't it better to use this simpler version?

Stream stream = ...
Observable.from(() -> stream.iterator());

@tokland
Copy link

tokland commented May 13, 2016

@nottmey: thanks very much, I had been looking for this a while now! I am a bit confused, though, which signature of Observable.from is used when invoked with a callable? I cannot seem to find it: http://reactivex.io/RxJava/javadoc/rx/Observable.html

@krevelen
Copy link

@arturom
Copy link

arturom commented Oct 1, 2016

or more succinctly, you can use

Stream stream = ...
Observable.from(stream::iterator);

@cogmission
Copy link

...and to go back the other way - I worked this out: (RxJava Observable-to-java.util.Stream)

http://www.cogmission.ai/2016/04/06/speed-blog-how-to-convert-an-rx-observable-to-a-java-util-stream/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment