Skip to content

Instantly share code, notes, and snippets.

@searler
Last active August 29, 2015 13:56
Show Gist options
  • Save searler/8904778 to your computer and use it in GitHub Desktop.
Save searler/8904778 to your computer and use it in GitHub Desktop.
RxJava I/O using recursive scheduler
public class Recursive {
public static void main(String[] args) throws InterruptedException {
final Scheduler scheduler = Schedulers.newThread();
Observable<Character> obs = Observable
.create(new OnSubscribeFunc<Character>() {
@Override
public Subscription onSubscribe(
final Observer<? super Character> observer) {
return scheduler.schedule(new Action1<Action0>() {
@Override
public void call(Action0 inner) {
Character c = ' ';
try {
c = (char) System.in.read();
} catch (IOException e) {
}
observer.onNext(c);
if (c.equals('!'))
observer.onCompleted();
else
inner.call();
}
});
}
});
obs.toBlockingObservable().forEach(new Action1<Character>() {
@Override
public void call(Character c) {
System.err.print(c);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment