Skip to content

Instantly share code, notes, and snippets.

@davengeo
Last active October 27, 2015 13:54
Show Gist options
  • Save davengeo/5c638e5b73ca43bf22c1 to your computer and use it in GitHub Desktop.
Save davengeo/5c638e5b73ca43bf22c1 to your computer and use it in GitHub Desktop.
Testing artefact for TDD with Rx Observables
/*
* Copyright (c) 2015.
* David Navarro
* me@davengeo.com
*/
public class TestRxStreamble implements Streamable<EventContainer> {
private static Logger LOG = LoggerFactory.getLogger(TestRxStreamble.class);
SettableFuture<EventContainer> future = SettableFuture.create();
private Observable<EventContainer> stream = Observable.from(future, Schedulers.computation()).
doOnEach(new Action1<Notification<? super EventContainer>>() {
@Override
public void call(Notification<? super EventContainer> notification) {
LOG.debug("value:{}", notification.getValue());
LOG.debug("next:{}, completed:{}, exception:{}",
notification.isOnNext(),
notification.isOnCompleted(),
notification.isOnError());
}
}).timeout(2, TimeUnit.SECONDS);
@Override
public Observable<EventContainer> stream() {
return stream;
}
public void setEvent(EventContainer event) {
future.set(event);
}
public void setError(Throwable error) {
future.setException(error);
}
public boolean isDone() {
return future.isDone();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment