Created
May 21, 2016 13:45
-
-
Save JensRantil/9d5296f67c32988c41e6b58b03e21252 to your computer and use it in GitHub Desktop.
Example of an `OperatorObserveOn` alternative. Not sure it works, though.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package my.application; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable.Operator; | |
import rx.Subscriber; | |
import my.application.ExecutorServiceUtils; | |
import my.application.User; | |
public class ExecutorOperator implements Operator<User, User> { | |
private final ExecutorService executor; | |
private final long gracefulShutDownTimeout; | |
private final TimeUnit gracefulShutDownTimeoutUnit; | |
public ExecutorOperator(ExecutorService executor, long gracefulShutDownTimeout, | |
TimeUnit gracefulShutDownTimeoutUnit) { | |
this.executor = executor; | |
this.gracefulShutDownTimeout = gracefulShutDownTimeout; | |
this.gracefulShutDownTimeoutUnit = gracefulShutDownTimeoutUnit; | |
} | |
// NOTE that this class only delegates onNext to the executor. onCompleted and onError will not be delegated! | |
@Override | |
public Subscriber<? super User> call(final Subscriber<? super User> s) { | |
return new Subscriber<User>() { | |
/** | |
* @return if shutdown was succesful | |
*/ | |
private boolean shutdownExecutor() { | |
// The shutdown time is essentially the time it can take to finish executing #queueLimit items. | |
// Make the timeout | |
// configurable if needed. | |
return ExecutorServiceUtils.shutdownNotMonitoredExecutor("executor", executor, | |
gracefulShutDownTimeout, gracefulShutDownTimeoutUnit); | |
} | |
@Override | |
public void onCompleted() { | |
if (!shutdownExecutor()) { | |
if (!s.isUnsubscribed()) { | |
s.onError(new RuntimeException("Could not shutdown the executor.")); | |
} | |
} else { | |
if (!s.isUnsubscribed()) { | |
s.onCompleted(); | |
} | |
} | |
} | |
@Override | |
public void onError(Throwable e) { | |
if (!shutdownExecutor()) { | |
e = new RuntimeException("Could not shutdown the executor, after a previous error.", e); | |
} | |
if (!s.isUnsubscribed()) { | |
s.onError(e); | |
} | |
} | |
@Override | |
public void onNext(final User t) { | |
if (s.isUnsubscribed()) { | |
return; | |
} | |
executor.execute(new Runnable() { | |
@Override | |
public void run() { | |
if (!s.isUnsubscribed()) { | |
// Important to check this again, since it might take some time before this Runnable is | |
// executed. | |
s.onNext(t); | |
} | |
} | |
}); | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment