Last active
August 29, 2015 14:17
-
-
Save JensRantil/3d1af12bfb6dbebd3be1 to your computer and use it in GitHub Desktop.
Test to use Observables for progress reporting
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package my.project.test; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
import org.junit.Test; | |
import com.google.common.base.Optional; | |
import org.elasticsearch.common.base.Objects; | |
import rx.Observable; | |
import rx.Observable.OnSubscribe; | |
import rx.Subscriber; | |
import rx.functions.Func1; | |
import rx.observables.ConnectableObservable; | |
public class ObservableProgressTest { | |
private static enum TaskState { | |
DONE, RUNNING | |
} | |
private static class TaskStateFilter implements Func1<TaskStatus<String>, Boolean> { | |
private TaskState state; | |
public TaskStateFilter(TaskState state) { | |
this.state = state; | |
} | |
@Override | |
public Boolean call(TaskStatus<String> arg0) { | |
return arg0.state.equals(state); | |
} | |
} | |
private static class TaskStatus<T> { | |
public static <T> TaskStatus<T> createProgress(int processPercentage) { | |
return new TaskStatus<T>(TaskState.RUNNING, Optional.<T> absent(), processPercentage); | |
} | |
public static <T> TaskStatus<T> createResult(T result) { | |
return new TaskStatus<T>(TaskState.DONE, Optional.of(result), 100); | |
} | |
public final int progressPercentage; // Between 0-100 | |
public final Optional<T> result; | |
public final TaskState state; | |
private TaskStatus(TaskState state, Optional<T> result, int progressPercentage) { | |
super(); | |
this.state = state; | |
this.result = result; | |
this.progressPercentage = progressPercentage; | |
} | |
@Override | |
public String toString() { | |
return Objects.toStringHelper(this.getClass()).add("state", state).add("result", result) | |
.add("progressPercentage", progressPercentage).toString(); | |
} | |
}; | |
private void handle(Observable<TaskStatus<String>> finishDirectly) { | |
ConnectableObservable<TaskStatus<String>> progress = finishDirectly.timeout(1, TimeUnit.SECONDS).publish(); | |
// Notifying the client(s) about the progress. | |
progress.takeWhile(new TaskStateFilter(TaskState.RUNNING)).subscribe(new Subscriber<TaskStatus<String>>() { | |
@Override | |
public void onCompleted() { | |
System.out.println("Telling the clients we are done."); | |
} | |
@Override | |
public void onError(Throwable e) { | |
System.out.println("Telling the clients we ran into an error."); | |
if (e instanceof TimeoutException) | |
System.out.println("Reason: Timed out."); | |
} | |
@Override | |
public void onNext(TaskStatus<String> t) { | |
System.out.println("Progress: " + t.toString()); | |
} | |
}); | |
// Handling the final result. | |
progress.first(new TaskStateFilter(TaskState.DONE)).subscribe(new Subscriber<TaskStatus<String>>() { | |
private boolean resultYielded = false; | |
@Override | |
public void onCompleted() { | |
if (!resultYielded) | |
throw new IllegalStateException("Completed without a result."); | |
} | |
@Override | |
public void onError(Throwable e) { | |
// Deliberately left empty. | |
} | |
@Override | |
public void onNext(TaskStatus<String> t) { | |
resultYielded = true; | |
System.out.println("Done: " + t.toString()); | |
} | |
}); | |
progress.connect(); | |
} | |
private Observable<TaskStatus<String>> sleepingEmitter(final int ms) { | |
return Observable.create(new OnSubscribe<TaskStatus<String>>() { | |
@Override | |
public void call(Subscriber<? super TaskStatus<String>> t1) { | |
try { | |
Thread.sleep(ms); | |
} catch (InterruptedException e) { | |
// Deliberately ignored. | |
} | |
} | |
}); | |
} | |
@Test | |
public void testFinishingDirectly() { | |
handle(Observable.just(TaskStatus.createResult("The result"))); | |
} | |
@Test | |
public void testProgressThenFinish() { | |
handle(Observable.just(TaskStatus.<String> createProgress(45), TaskStatus.createResult("The final result."))); | |
} | |
@Test | |
public void testTimeout() { | |
handle(sleepingEmitter(2000)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment