Skip to content

Instantly share code, notes, and snippets.

@JensRantil
Created May 21, 2016 13:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JensRantil/9d5296f67c32988c41e6b58b03e21252 to your computer and use it in GitHub Desktop.
Save JensRantil/9d5296f67c32988c41e6b58b03e21252 to your computer and use it in GitHub Desktop.
Example of an `OperatorObserveOn` alternative. Not sure it works, though.
package my.application;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import rx.Observable.Operator;
import rx.Subscriber;
import my.application.ExecutorServiceUtils;
import my.application.User;
public class ExecutorOperator implements Operator<User, User> {
private final ExecutorService executor;
private final long gracefulShutDownTimeout;
private final TimeUnit gracefulShutDownTimeoutUnit;
public ExecutorOperator(ExecutorService executor, long gracefulShutDownTimeout,
TimeUnit gracefulShutDownTimeoutUnit) {
this.executor = executor;
this.gracefulShutDownTimeout = gracefulShutDownTimeout;
this.gracefulShutDownTimeoutUnit = gracefulShutDownTimeoutUnit;
}
// NOTE that this class only delegates onNext to the executor. onCompleted and onError will not be delegated!
@Override
public Subscriber<? super User> call(final Subscriber<? super User> s) {
return new Subscriber<User>() {
/**
* @return if shutdown was succesful
*/
private boolean shutdownExecutor() {
// The shutdown time is essentially the time it can take to finish executing #queueLimit items.
// Make the timeout
// configurable if needed.
return ExecutorServiceUtils.shutdownNotMonitoredExecutor("executor", executor,
gracefulShutDownTimeout, gracefulShutDownTimeoutUnit);
}
@Override
public void onCompleted() {
if (!shutdownExecutor()) {
if (!s.isUnsubscribed()) {
s.onError(new RuntimeException("Could not shutdown the executor."));
}
} else {
if (!s.isUnsubscribed()) {
s.onCompleted();
}
}
}
@Override
public void onError(Throwable e) {
if (!shutdownExecutor()) {
e = new RuntimeException("Could not shutdown the executor, after a previous error.", e);
}
if (!s.isUnsubscribed()) {
s.onError(e);
}
}
@Override
public void onNext(final User t) {
if (s.isUnsubscribed()) {
return;
}
executor.execute(new Runnable() {
@Override
public void run() {
if (!s.isUnsubscribed()) {
// Important to check this again, since it might take some time before this Runnable is
// executed.
s.onNext(t);
}
}
});
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment