Skip to content

Instantly share code, notes, and snippets.

@aztecrex
Last active August 29, 2015 14:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aztecrex/267ab17b0a57a8ac1506 to your computer and use it in GitHub Desktop.
Save aztecrex/267ab17b0a57a8ac1506 to your computer and use it in GitHub Desktop.
package demo;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func0;
import rx.util.async.Async;
/**
** <p>
* Demonstrate attempts to get RxJava retry for asynchronous work chain. The use
* case that exposed this problem is reading and writing data with versioning
* for optimistic concurrency. The work is a series of async I/O operations that
* must be re-assembled from scratch if a stale version is detected on write.
* </p>
*
* <p>
* Four cases are demonstrated in this class:
* </p>
* <ul>
* <li>Case 1: perform the work and naiively apply a retry operator to the
* asynchronous work. This fails because the work itself is not retried on
* re-subscribe.</li>
* <li>Case 2: wrap the work in an observer that performs it on every
* subscription. A retry operator applied to the wrapper correctly re-attempts
* the work on failure. However, every subsequent subscriber to the result
* causes the work to be performed again.</li>
* <li>Case 3: Apply the cache operator to the result of the retry operator.
* This performs as desired.</li>
* <li>Case 4: Generalize the approach of case 3 and encapsulate it in an
* observable generator method. This shows that it is difficult to generalize
* this behavior because each retry operator form (number, predicate, perpetual)
* will require its own generator method.</li>
* </ul>
*
* <p>
* NOTE: this code does not work if compiled by the Eclipse (Keppler) compiler
* for Java 8. I have to compile with javac for it to work. There is some
* problem with Lambda class naming in the code generated by Eclipse.
* </p>
*
*
*/
public class AsyncRetryDemo {
public static void main(final String[] args) throws Exception {
new AsyncRetryDemo().case1();
new AsyncRetryDemo().case2();
new AsyncRetryDemo().case3();
new AsyncRetryDemo().case4();
// output is:
//
// case 1, sub 1: fail (max retries, called=1)
// case 1, sub 2: fail (max retries, called=1)
// case 2, sub 1: pass (called=2)
// case 2, sub 2: fail (called=3)
// case 3, sub 1: pass (called=2)
// case 3, sub 2: pass (called=2)
// case 4, sub 1: pass (called=2)
// case 4, sub 2: pass (called=2)
}
private static <R> Observable<R> retryAndCache(
final Func0<Observable<R>> binder, final int retries) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(final Subscriber<? super R> child) {
binder.call().subscribe(child);
}
})
.retry(retries)
.cache();
}
private final AtomicInteger called = new AtomicInteger();
private final CountDownLatch done = new CountDownLatch(2);
/**
* This represents a sequence of interdependent asynchronous operations that
* might fail in a way that prescribes a retry. In this case, all we are
* doing is squaring an integer asynchronously.
*
* @param input
* to the process.
*
* @return promise to perform the work and produce either a result or a
* suggestion to retry (e.g. a stale version error).
*/
private Observable<Integer> canBeRetried(final int a) {
final Observable<Integer> rval;
if (this.called.getAndIncrement() == 0) {
rval = Observable.error(new RuntimeException(
"we always fail the first time"));
} else {
rval = Async.start(() -> a * a);
}
return rval;
}
private void case1() throws InterruptedException {
/*
* In this case, we invoke the observable-creator to get the async
* promise. Of course, if it fails, any retry will fail as well because
* the failed result is computed one time and pushed to all subscribers
* forever.
*
* Thus this case fails because the first invocation of canBeRetried(..)
* always fails.
*/
final Observable<Integer> o = canBeRetried(2)
.retry(2);
check("case 1", o);
this.done.await();
}
private void case2() throws InterruptedException {
/*
* In this case, we wrap canBeRetried(..) inside an observer that
* invokes it on every subscription. So, we get past the retry problem.
* But every new subscriber after the retry succeeds causes the work to
* restart.
*/
final Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> child) {
canBeRetried(2).subscribe(child);
}
})
.retry(2);
check("case 2", o);
this.done.await();
}
private void case3() throws InterruptedException {
/*
* In this case, we wrap canBeRetried(..) inside an observer that
* invokes it on every subscription. So, we get past the retry problem.
* We cache the result of the retry to solve the extra work problem.
*/
final Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> child) {
canBeRetried(2).subscribe(child);
}
})
.retry(2)
.cache();
check("case 3", o);
this.done.await();
}
private void case4() throws InterruptedException {
/*
* Same as case 3 but we use the retryAndCache(..) to do the work for
* us.
*/
final Observable<Integer> o = retryAndCache(() -> canBeRetried(2), 2);
check("case 4", o);
this.done.await();
}
private void check(final String label, final Observable<Integer> promise) {
// does the work get retried on failure?
promise.subscribe(
v -> {
System.out.println(label + ", sub 1: "
+ (this.called.get() == 2 ? "pass" : "fail")
+ " (called=" + this.called.get() + ")");
},
x -> {
System.out.println(label
+ ", sub 1: fail (max retries, called="
+ this.called.get() + ")");
this.done.countDown();
}, () -> {
this.done.countDown();
});
// do subsequent subscribers avoid invoking the work again?
promise.subscribe(
v -> {
System.out.println(label + ", sub 2: "
+ (this.called.get() == 2 ? "pass" : "fail")
+ " (called=" + this.called.get() + ")");
},
x -> {
System.out.println(label
+ ", sub 2: fail (max retries, called="
+ this.called.get() + ")");
this.done.countDown();
}, () -> {
this.done.countDown();
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment