Skip to content

Instantly share code, notes, and snippets.

@JensRantil
Last active August 29, 2015 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JensRantil/3d1af12bfb6dbebd3be1 to your computer and use it in GitHub Desktop.
Save JensRantil/3d1af12bfb6dbebd3be1 to your computer and use it in GitHub Desktop.
Test to use Observables for progress reporting
package my.project.test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import com.google.common.base.Optional;
import org.elasticsearch.common.base.Objects;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func1;
import rx.observables.ConnectableObservable;
public class ObservableProgressTest {
private static enum TaskState {
DONE, RUNNING
}
private static class TaskStateFilter implements Func1<TaskStatus<String>, Boolean> {
private TaskState state;
public TaskStateFilter(TaskState state) {
this.state = state;
}
@Override
public Boolean call(TaskStatus<String> arg0) {
return arg0.state.equals(state);
}
}
private static class TaskStatus<T> {
public static <T> TaskStatus<T> createProgress(int processPercentage) {
return new TaskStatus<T>(TaskState.RUNNING, Optional.<T> absent(), processPercentage);
}
public static <T> TaskStatus<T> createResult(T result) {
return new TaskStatus<T>(TaskState.DONE, Optional.of(result), 100);
}
public final int progressPercentage; // Between 0-100
public final Optional<T> result;
public final TaskState state;
private TaskStatus(TaskState state, Optional<T> result, int progressPercentage) {
super();
this.state = state;
this.result = result;
this.progressPercentage = progressPercentage;
}
@Override
public String toString() {
return Objects.toStringHelper(this.getClass()).add("state", state).add("result", result)
.add("progressPercentage", progressPercentage).toString();
}
};
private void handle(Observable<TaskStatus<String>> finishDirectly) {
ConnectableObservable<TaskStatus<String>> progress = finishDirectly.timeout(1, TimeUnit.SECONDS).publish();
// Notifying the client(s) about the progress.
progress.takeWhile(new TaskStateFilter(TaskState.RUNNING)).subscribe(new Subscriber<TaskStatus<String>>() {
@Override
public void onCompleted() {
System.out.println("Telling the clients we are done.");
}
@Override
public void onError(Throwable e) {
System.out.println("Telling the clients we ran into an error.");
if (e instanceof TimeoutException)
System.out.println("Reason: Timed out.");
}
@Override
public void onNext(TaskStatus<String> t) {
System.out.println("Progress: " + t.toString());
}
});
// Handling the final result.
progress.first(new TaskStateFilter(TaskState.DONE)).subscribe(new Subscriber<TaskStatus<String>>() {
private boolean resultYielded = false;
@Override
public void onCompleted() {
if (!resultYielded)
throw new IllegalStateException("Completed without a result.");
}
@Override
public void onError(Throwable e) {
// Deliberately left empty.
}
@Override
public void onNext(TaskStatus<String> t) {
resultYielded = true;
System.out.println("Done: " + t.toString());
}
});
progress.connect();
}
private Observable<TaskStatus<String>> sleepingEmitter(final int ms) {
return Observable.create(new OnSubscribe<TaskStatus<String>>() {
@Override
public void call(Subscriber<? super TaskStatus<String>> t1) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
// Deliberately ignored.
}
}
});
}
@Test
public void testFinishingDirectly() {
handle(Observable.just(TaskStatus.createResult("The result")));
}
@Test
public void testProgressThenFinish() {
handle(Observable.just(TaskStatus.<String> createProgress(45), TaskStatus.createResult("The final result.")));
}
@Test
public void testTimeout() {
handle(sleepingEmitter(2000));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment