Greeting: Hey
Greeting: Hi
Greeting: Hello
[60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
[70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
[80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
[90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
Created
June 12, 2014 15:38
-
-
Save JakeWharton/e133433be99780529476 to your computer and use it in GitHub Desktop.
Map Java 8 streams to RxJava Observables.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Iterator; | |
import java.util.stream.IntStream; | |
import java.util.stream.Stream; | |
import rx.Observable; | |
import rx.Subscriber; | |
public class RxStreams<T> extends Observable<T> { | |
public static <T> RxStreams<T> from(Stream<T> stream) { | |
return new RxStreams<T>(new OnStreamSubscribe<T>(stream)); | |
} | |
protected RxStreams(OnStreamSubscribe<T> func) { | |
super(func); | |
} | |
protected static class OnStreamSubscribe<T> implements OnSubscribe<T> { | |
private final Stream<T> stream; | |
public OnStreamSubscribe(Stream<T> stream) { | |
this.stream = stream; | |
} | |
@Override public void call(Subscriber<? super T> subscriber) { | |
for (Iterator<T> i = stream.iterator(); i.hasNext(); ) { | |
if (subscriber.isUnsubscribed()) { | |
return; | |
} | |
subscriber.onNext(i.next()); | |
} | |
subscriber.onCompleted(); | |
} | |
} | |
public static void main(String... args) { | |
Observable<String> o1 = RxStreams.from(Stream.of("Hey", "Hi", "Hello")); | |
o1.map(s -> "Greeting: " + s).subscribe(System.out::println); | |
Observable<String> o2 = RxStreams.from(IntStream.range(0, 100).mapToObj(String::valueOf)); | |
o2.skip(60).buffer(10).subscribe(System.out::println); | |
} | |
} |
@nottmey: thanks very much, I had been looking for this a while now! I am a bit confused, though, which signature of Observable.from
is used when invoked with a callable? I cannot seem to find it: http://reactivex.io/RxJava/javadoc/rx/Observable.html
or more succinctly, you can use
Stream stream = ...
Observable.from(stream::iterator);
...and to go back the other way - I worked this out: (RxJava Observable-to-java.util.Stream)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello Jake,
isn't it better to use this simpler version?