Skip to content

Instantly share code, notes, and snippets.

@SergejIsbrecht
Last active November 2, 2016 19:44
Show Gist options
  • Save SergejIsbrecht/e4bed81320a1738834dedb5da67a426d to your computer and use it in GitHub Desktop.
Save SergejIsbrecht/e4bed81320a1738834dedb5da67a426d to your computer and use it in GitHub Desktop.
@org.junit.Test
public void name() throws Exception {
Observable<String> obs = Observable.create(customLogic());
}
private Observable.OnSubscribe<String> customLogic() {
return subscriber -> {
try {
if (!subscriber.isUnsubscribed()) {
Scanner scanner = new Scanner("wurst.file.c");
subscriber.add(Subscriptions.create(scanner::close));
while (scanner.hasNext()) {
String line = scanner.nextLine();
if (subscriber.isUnsubscribed()) {
break;
}
subscriber.onNext(line);
}
subscriber.onCompleted();
}
} catch (Exception e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
};
}
@Test
public void t1() throws Exception {
Func0<Scanner> createFactory = () -> {
return new Scanner("wurst.file.c");
};
Func1<Scanner, Observable<String>> observableFactory = (resource) -> {
return PublishSubject.<String>create(subscriber -> {
while (resource.hasNext()) {
String line = resource.nextLine();
subscriber.onNext(line);
}
subscriber.onCompleted();
}).asObservable();
};
Action1<Scanner> disposeFactory = (resource) -> {
resource.close();
};
Observable<String> using = Observable.using(createFactory, observableFactory, disposeFactory);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment