This guide is a first draft (that will end up in the official docs) on writing resilient code for production with the Couchbase Java SDK. At the end, the reader will be able to write code that withstands bugs, latency issues or anything else that can make their application fail.
Note that lots of concepts can be applied for both synchronous and asynchronous access. When necessary, both patterns are discussed separately. Also, the focus is on database interaction, but if you are using RxJava as part of your stack you can apply most of the principles there as well (and should!).
When working with Observables, it is important to understand the difference between cold and hot. Cold Observables will start to emit events once a Observer subscribes, and will do it "fresh" for each Observer. Hot Observables instead are starting to emit data as soon as it becomes available, and will return the same (or parts of the same) to each Observer. Hot Observables are also called Subjects in RxJava.
This is important, because the Couchbase Java SDK uses Hot Observables for each database operation. There are some architectural reasons for this, but also just the fact that if you do a get and subscribe twice, you don't want two network operations going out instead of one and share the result.
That said, RxJava provides ways to make hot Observables cold and vice versa. One of the most important things to remember is that wen you resubscribe to a hot Observable, it will emit the same data again (and not do the operation again agains the Server). This is okay in many cases, but not if you want to retry an operation because it has failed.
So while the following code is wrong because it will give you the same result over and over again:
// Will just retry on the same result from the first get request
bucket.async().get("id").retry(5).subscribe();
This is correct, beacuse it will produce a brand new Observable every time you retry (resubscribe).
// Will correctly do a new get operation against the server
Observable
.defer(new Func0<Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call() {
return bucket.async().get("id");
}
})
.retry(5)
.subscribe();
So, use Observable#defer() if you want a new Observable for every subscriber. You can also use this technique if you want to defer the execution of an observable until someone subscribes.
If you want to turn a cold into a hot observable, take a look at the Observable#cache() or Observable#replay() operators.
One last thing: the Observable is not converted from cold to hot just because a hot Observable is flatMapped, so this code works perfectly fine:
Observable
.just("id")
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(String id) {
return bucket.async().get(id);
}
})
.retry(5)
.subscribe();
This chapter discusses different strategies to mitigate errors that may come up during operations (covered in the next Section, "Error Causes"). Some of them are shown to make a point, but the techniques apply to all different types of errors and can be applied as you see fit.
Logging errors is always important, but even more so with reactive applications. Because of the event driven nature, stack traces get harder to look at, and caller context is sometimes lost.
RxJava provides operators for side effects, which should be used to log errors. Of course, you can also put logging into the error handlers, but readability is increased if the logging is put explicitly as a side effect.
Observable
.error(new Exception("I'm failing"))
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// I'm an explicit side effect
// use a proper logger of your choice here
LOGGER.warn("Error while doing XYZ", throwable);
}
})
.subscribe();
We also recommend to configure your logger to include absolute timestamps. While this is always a good idea, combined with good logging throughout the application, it makes it easier for you to debug error cases and see later what was going on inside your reactive application.
You can also utilize the various other side effect operators for general logging (doOnNext
, doOnCompleted
). If you don't want to have different side effects for the same logging operation, you can use doOnEach
, which will be called for both errors and next events.
Failing is the easiest way to handle errors - because you don't. While of course most of the time you want more sophisticated error handling strategies (as dicussed later), sometimes you just need to fail. Some errors just do not make sense to be retried (because they are not transient) or you already tried everything to make it work, but it still keeps failing.
In error resilient architectures, you want to do everything to keep the error contained, but if the containment is not able to handle the error it needs to propagate it to a parent component that (maybe) can.
In the async case, errors are events like every other for your subscribers. Once an error happens, your Subscriber is notified in its onError(Throwable)
method and you can handle it the way you want to. Note that by Observable contract, after an onError
event, no more onNext
events will happen.
Observable
.error(new Exception("I'm failing"))
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.err.println("Got Error: " + e);
}
@Override
public void onNext(Object o) {
}
});
If your subscriber does not handle the onError
and an error happens, you will see an exception in your logs:
Exception in thread "main" rx.exceptions.OnErrorNotImplementedException: I'm failing
at rx.Observable$31.onError(Observable.java:7204)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:127)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:96)
at rx.Observable$ThrowObservable$1.call(Observable.java:9171)
at rx.Observable$ThrowObservable$1.call(Observable.java:9161)
at rx.Observable.subscribe(Observable.java:7463)
at rx.Observable.subscribe(Observable.java:7195)
This indicates that its always a good idea to implement error handling.
In the synchronous case, every error is converted into a Exception and thrown, so you can use regular try/catch
semantics.
try {
Object data = Observable
.error(new Exception("I'm failing"))
.toBlocking()
.single();
} catch(Exception ex) {
System.err.println("Got Exception: " + ex);
}
If you do not catch the Exception, it will bubble up:
Exception in thread "main" java.lang.RuntimeException: java.lang.Exception: I'm failing
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:482)
at rx.observables.BlockingObservable.single(BlockingObservable.java:349)
Retrying operations is a common technique to ride over transient errors. It should not be used for non-transient errors, because it will only put load onto the system without the chance to resolve the error.
In practice, the following retry strategies can be applied when a transient error is discovered:
- Retry immediately
- Retry with a fixed delay
- Retry with a liniearly increasing delay
- Retry with an exponential increasing delay
- Retry with a random delay
Unless you have a very good reason to not to, always apply a maximum number of tries and then escalate the error. Systems stuck in infinite retry loops can cause issues that are very hard to debug. It's better to fail and propagate at some point.
Also, we recommend that you use asynchronous retry even if you are blocking at the very end. Retrying in the asynchronous Observables is way more resource efficient and also the only sane way to handle multiple operation steps (and bulk operations) under a single timeout (more on that in the "Timeout" section).
We will provide retry handlers for the most common scenarios in a future release so you don't need to implement this on your own. For now, please use the code provided in this documentation (or variations of it).
Let's get one thing straight right away: immediately retrying is almost never a good idea. Instead of resolving the error more quickly, it will put more pressure onto the retried system and there is a good chance it will make resolving errors harder.
One good reason to do so is if you have a specific operation with a very short timeout that you want to keep retrying for a small, fixed amount of times and if it still does not work, fail fast.
If you have the feeling you need to retry very quickly, you can also apply a very slight increasing delay to at least release some pressure from the target system.
RxJava provides the retry
operator to resubscribe to the source Observable immediately once it fails (an error event happens). Three flavours are available:
retry()
: Instantly retry as long as the source Observable emits an error. We strongly recommend to not use it, instead use:retry(long count)
: Instantly retry as long as the source Observable emits an error or the max count is reached. If the count is reached, the Observable will not be resubscribed, but the error is propagated down the stream.retry(Func2<Integer, Throwable, Boolean> predicate)
: Instantly retry as long as the predicate returns true. Arguments to the predicate are the number of tries, as well as the exception type.
Since the predicate method provides the most flexibility, we recommend using that one. So if you only want to handle a specific exception and retry a maximum of MAX_TRIES times, you can do it like this:
Observable
.error(new CASMismatchException())
.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer tries, Throwable throwable) {
return (throwable instanceof CASMismatchException) && tries < MAX_TRIES;
}
})
.subscribe();
Try replacing the CASMismatchException
with something else and you will see that it does not try to retry, but rather populates the error downstream. You can use this technique to handle specific errors differently by adding more retry operators in the pipeline.
Using the retry with predicate also allows you to log the number of retries for a specific error. If you use the doOnError
for logging in this case it's harder to log the number of retries.
The synchronous equivalent to the latest code looks like this:
int tries = 0;
while(true) {
tries++;
try {
pretendWorkThatMaybeThrows(); // does some work and maybe throws
break;
} catch(Throwable throwable) {
if (!(throwable instanceof CASMismatchException) || tries >= MAX_TRIES) {
throw throwable; // rethrow exceptions
}
}
}
When applying a retry with delay, the main question you need to ask yourself is: how often and how long is it feasible to retry before giving up (and escalate the error). This highly depends on the type of operation, use case and SLA that the application requires, but the techniques are the same.
RxJava provides the retryWhen
operator, which allows you more flexibility on the actions performed as well as when the resubscription is happening. This chapter covers the different delay approaches based on this operator.
Here is the contract for retryWhen
that you should always keep in mind:
- It is called when an error on the source Observable happens.
- The function provided will be called with an Observable containing this error.
- If you make this Observable error, it is propagated downstream (without retrying).
- If you make this Observable complete, it is propagated downstream (without retrying).
- In all other cases, a retry will happen.
The easiest approach is the fixed delay. The source observable will be resubscribed after a specified amount of time and for a fixed maximum number of times.
Because the nested logic is a bit harder to understand in the first place, let's talk through it step by step and then put it together.
Our retryWhen
function is called every time an error happens on the source Observable. If we wanted to try forever every second, it can look like this:
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> errorNotification) {
return errorNotification.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return Observable.timer(1, TimeUnit.SECONDS);
}
});
}
})
We flatMap
our notification Observable and utilize the Observable#timer
to defer emitting a new event for a second. Now since we need to stop at some point (after a given number of tries), we can utilize the Observable#zipWith
operator to zip our error stream together with a range where we specify the number of tries we want to allow. Zipping has the nice side effect that once one of the Observable is completed, the result Observable will also be complete, which triggers our Rule 4 from above.
So a modified version looks like:
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> errorNotification) {
return errorNotification
.zipWith(Observable.range(1, 4), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attempts) {
return attempts;
}
})
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer attempts) {
return Observable.timer(1, TimeUnit.SECONDS);
}
});
}
})
Technically, we don't need the zip function here because we ignore it later on, but it is required for the zipWith
operator to work. We use the Observable#range
operator to create an Observable that emits 3 events and then completes, so we will never end up with more retries.
There is one more enhancement needed: the code as it stands there will swallow the originating exception when moving on, which is not good because it should be propagated if it can't be handled in this code block.
The following code is modified so that the function of zipWith
does not only return the attempt count, but also the throwable, so we have access to it in the flatMap
method. For this, the Java client has a generic Tuple
we can utilize. In the flatMap
we check for the number of attempts and if we are over our threshold we rethrow the exception. Keep in mind that we need to change Observable#range
call to MAX_ATTEMPTS+1, to give our code a chance to be called again one final time.
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> errorNotification) {
return errorNotification
.zipWith(Observable.range(1, 5), new Func2<Throwable, Integer, Tuple2<Throwable, Integer>>() {
@Override
public Tuple2<Throwable, Integer> call(Throwable throwable, Integer attempts) {
return Tuple.create(throwable, attempts);
}
})
.flatMap(new Func1<Tuple2<Throwable, Integer>, Observable<?>>() {
@Override
public Observable<?> call(Tuple2<Throwable, Integer> attempt) {
if (attempt.value2() == 3) {
return Observable.error(attempt.value1());
}
return Observable.timer(1, TimeUnit.SECONDS);
}
});
}
})
If you want to enhance it even further, you can add one more if()
clause in the flatMap
to see if the throwable that is passed down is actually one we want to retry.
Functionality like this is a great candidate to be generic and encapsulated, so we are working on providing similar functionaly out of the box. If you are using Java 8 already, the code becomes more condensed as well:
.retryWhen(notification ->
notification
.zipWith(Observable.range(1, 5), Tuple::create)
.flatMap(att ->
att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(1, TimeUnit.SECONDS)
)
)
Here are the variations for linear, exponential and random delays:
Linear:
// Utilizes the number of attempts for the number of seconds to wait
.retryWhen(notification ->
notification
.zipWith(Observable.range(1, 5), Tuple::create)
.flatMap(att ->
att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(att.value2(), TimeUnit.SECONDS)
)
)
Exponential:
// Uses the timer with 2^attempts to generate exponential delays
.retryWhen(notification ->
notification
.zipWith(Observable.range(1, 5), Tuple::create)
.flatMap(att ->
att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(1 << att.value2(), TimeUnit.SECONDS)
)
)
Random:
// Random between 0 and 5 seconds to retry per attempt
.retryWhen(notification ->
notification
.zipWith(Observable.range(1, 5), Tuple::create)
.flatMap(att ->
att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(new Random().nextInt(5), TimeUnit.SECONDS)
)
)
With synchronous code, there are not many options other than using Thread.sleep()
to keep the current thread waiting until the loop is allowed to proceed:
// Linear Backoff
int tries = 0;
while(true) {
tries++;
try {
pretendWorkThatMaybeThrows(); // does some work and maybe throws
break;
} catch(Throwable throwable) {
if (!(throwable instanceof CASMismatchException) || tries >= MAX_TRIES) {
throw throwable; // rethrow exceptions
}
}
Thread.sleep(TimeUnit.SECONDS.toMillis(tries));
}
You can then use the same approaches as with the asynchronous ones on the Thread.sleep()
time to accomodate for a static, linear, exponential or random delay.
Instead of (or in addition to) retrying, another valid option is falling back to either a different Observable or a default value.
RxJava provides you with different operators, prefixed with onError*()
:
onErrorReturn(Func1<Throwable, T>)
: Called when the source Observable errors and allows to return custom data instead.onErrorResumeNext(Observable<?>)
: Called when the source Observable errors and allows to resume transparently with a different Observable.onErrorResumeNext(Func1<Throwable, Observable<?>)
: Called when the source Observable errors and allows to transparently resume with an Observable (based on a specific Throwable).
You should use the onErrorReturn
if you want to fallback to static data quickly. For example:
Observable
.<String>error(new Exception("I failed"))
.onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
// You could return data based on the throwable as well
return "Default";
}
})
.subscribe();
If you only want to return default values based on a specific exception or even call another Observable as fallback, onErrorResumeNext
is for you.
Observable
.<String>error(new TimeoutException("I failed"))
.onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(Throwable throwable) {
if (throwable instanceof TimeoutException) {
return Observable.just("Default");
}
// Forward anything other than the TimeoutException
return Observable.error(throwable);
}
})
.subscribe();
If you just want to fallback onto another Observable that you have in scope without caring about the Exception, you can use the other onErrorResumeNext()
overload. For example, this loads data from all replicas if the get()
call did not succeed with the Java SDK:
bucket
.async()
.get("id")
.onErrorResumeNext(bucket.async().getFromReplica("id", ReplicaMode.ALL))
.subscribe();
Synchronous fallbacks can be implemented by conditionally setting the default in the catch
clause:
String value;
try {
value = pretendWorkThatMaybeThrows();
} catch(Exception ex) {
value = "Default";
}
Here is the gotcha though: this only works great if the fallback is static. If you need to fallback into another database call for example, you quickly get into nested error handling and timeouts are a pain to handle (since they start to accumulate for every synchronous call). We recommend you to use asynchronous fallbacks and then block at the very end through toBlocking().single()
or equivalents.
Another possibility that requires intervention on the application side are Observables that do not emit a single value. This can be because operators filtered the Observable so that nothing is left, or they did not produce any values in the first place. One common case in the Java SDK is get()
anf its cousins. If the Document is not found, the Observable will complete without emitting anything.
RxJava provides helper operators which all end with *OrDefault()
and allow you to return default values if no item is emitted when the Observable completes.
In most cases you want to use singleOrDefault()
and return a default value when not a single item is emitted by the source Observable:
Observable
.<String>empty()
.singleOrDefault("Default")
.subscribe();
If you are dealing with potentially more than one item emitted in your Observable and you only want to emit either the first or the last value, there are also operators that allow you to emit a default if it's unexpectedly empty. See firstOrDefault()
as well as lastOrDefault()
for more details.
Bulk operations are used to handle more data in one batch (and therefore benefit from better resource utilization), but error handling becomes more complicated. There are three high level cases to consider:
- "Best Effort": keep going and just use the results that succeeded when errors happen.
- "Full Retry": Retry the complete Observable when errors happen.
- "Incremental Retry": Only retry specific events of the Observable.
Before we dig into the specific approaches, let's revisit the contract of Observables:
onEvent* (onError | onComplete)
Zero or more events are followed by either an error event or a complete event. This provides an important clue right away: once our observable fails, no more events can be passed through. Therefore, you need to make sure that errors are handled at the smallest scope possible, only letting it proceed if you cannot handle it right away.
Sometimes it is more important to get data in a timeframe (maybe with a short timeout) then getting all data. Of course in general you want to keep a timeout that lets you fetch all the data, but depending on the use case you are fine with only getting a subset of the data returned.
To ignore errors and turn them into "noops", you can utilize onErrorResumeNext()
:
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("A");
subscriber.onNext("B");
subscriber.onError(new IllegalStateException("Woops"));
subscriber.onNext("C");
}
})
.onErrorResumeNext(Observable.<String>empty())
.toBlocking()
.forEach(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Got: " + s);
}
});
This will not raise any exception to your calling thread, but it will never process "C", because based on the Observable contract, once onError
is called, no more events are allowed to be generated.
In order to keep going in case events fail, you need to turn each event into a single Observable and then merge them back together to either defer the error handling (through Observable#mergeDelayError
) or use flatMap
and make sure to contain the errors before flattening.
This code provides some fake data you can work with. It will emit 4 observables where one of them will fail:
Observable<Observable<String>> dataObservables = Observable
.just("a", "b", "c", "d")
.map(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
if (s.equals("b")) {
return Observable.error(new IllegalStateException("I dont like b"));
}
return Observable.just(s);
}
});
You can then use mergeDelayError
to defer error handling until the end. If you uncomment the onErrorResumeNext
it will silently discard any errors as well, leaving you with an Observable that provides best-effort processing of source Observables.
Observable.mergeDelayError(dataObservables)
//.onErrorResumeNext(Observable.<String>empty())
.toBlocking()
.forEach(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
Alternatively, you can use flatMap
and make sure the errors are contained for each emitted Observable:
Observable
.just("a", "b", "c", "d")
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
// Simulate some observable that sometimes fails
Observable<String> obs;
if (s.equals("b")) {
obs = Observable.error(new IllegalStateException("I dont like b"));
} else {
obs = Observable.just(s);
}
// Here comes the actual handling part before we flatten it back
return obs.onErrorResumeNext(Observable.<String>empty());
}
})
.toBlocking()
.forEach(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
As a more practical example, here is a best effort bulk loading of documents from Couchbase Server that just discards operations that fail:
private static Observable<JsonDocument> loadDocsBestEffort(Bucket bucket, List<String> ids) {
return Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(String id) {
return bucket
.async()
.get(id)
.onErrorResumeNext(Observable.<JsonDocument>empty());
}
});
}
Of course, you can add more logic onto each emitted Observable and also proper logging so that you at least know which errors happened.
TODO
**TODO
- batching ops together with operators
- semaphores
This chapter discusses errors that need to be covered by the application developer to make the application resilient.
- worst nightmare and safety net at the same time.
- so many possibilities for timeouts
- how to apply timeouts to async workflows
- how timeouts are used in the sync wrappers
- difference between global timeout and single-timeout ops
- when can it happen and what can you do agaisnt it?
- how to handle erros that come because of insers, locks, cas mismatches,...
- which errors can come up during rebalance, failover or even a failing node (before failover) and how to handle that
@benjchristensen I'd actually specifically asked if we could get Java 7 examples, as there's still a lot of people who want to use this that aren't quite ready for Java 8 yet. Also, although the Java 8 style is much more compact and easy to read, I think it can be a little difficult to get what is going on if it's your first exposure to it -- learning both Rx style and Java 8 style in one go.
Maybe the best thing here would be if we can get buttons that let you switch between java 7 and java 8 examples. Would that make sense, or would the surrounding text have to be too generic for that to work in practice? (maybe that's exactly what you were suggesting anyway??)