Skip to content

Instantly share code, notes, and snippets.

@daschl
Last active August 26, 2020 23:17
Show Gist options
  • Save daschl/db9fcc9d2b932115b679 to your computer and use it in GitHub Desktop.
Save daschl/db9fcc9d2b932115b679 to your computer and use it in GitHub Desktop.
Draft: Writing Code for Production

Writing Resilient Reactive Applications

This guide is a first draft (that will end up in the official docs) on writing resilient code for production with the Couchbase Java SDK. At the end, the reader will be able to write code that withstands bugs, latency issues or anything else that can make their application fail.

Note that lots of concepts can be applied for both synchronous and asynchronous access. When necessary, both patterns are discussed separately. Also, the focus is on database interaction, but if you are using RxJava as part of your stack you can apply most of the principles there as well (and should!).

RxJava 101 Recap: Cold and Hot Observables

When working with Observables, it is important to understand the difference between cold and hot. Cold Observables will start to emit events once a Observer subscribes, and will do it "fresh" for each Observer. Hot Observables instead are starting to emit data as soon as it becomes available, and will return the same (or parts of the same) to each Observer. Hot Observables are also called Subjects in RxJava.

This is important, because the Couchbase Java SDK uses Hot Observables for each database operation. There are some architectural reasons for this, but also just the fact that if you do a get and subscribe twice, you don't want two network operations going out instead of one and share the result.

That said, RxJava provides ways to make hot Observables cold and vice versa. One of the most important things to remember is that wen you resubscribe to a hot Observable, it will emit the same data again (and not do the operation again agains the Server). This is okay in many cases, but not if you want to retry an operation because it has failed.

So while the following code is wrong because it will give you the same result over and over again:

// Will just retry on the same result from the first get request
bucket.async().get("id").retry(5).subscribe();

This is correct, beacuse it will produce a brand new Observable every time you retry (resubscribe).

// Will correctly do a new get operation against the server
Observable
    .defer(new Func0<Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call() {
            return bucket.async().get("id");
        }
    })
    .retry(5)
    .subscribe();

So, use Observable#defer() if you want a new Observable for every subscriber. You can also use this technique if you want to defer the execution of an observable until someone subscribes.

If you want to turn a cold into a hot observable, take a look at the Observable#cache() or Observable#replay() operators.

One last thing: the Observable is not converted from cold to hot just because a hot Observable is flatMapped, so this code works perfectly fine:

Observable
    .just("id")
    .flatMap(new Func1<String, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(String id) {
            return bucket.async().get(id);
        }
    })
    .retry(5)
    .subscribe();

Error Recovery

This chapter discusses different strategies to mitigate errors that may come up during operations (covered in the next Section, "Error Causes"). Some of them are shown to make a point, but the techniques apply to all different types of errors and can be applied as you see fit.

Logging

Logging errors is always important, but even more so with reactive applications. Because of the event driven nature, stack traces get harder to look at, and caller context is sometimes lost.

RxJava provides operators for side effects, which should be used to log errors. Of course, you can also put logging into the error handlers, but readability is increased if the logging is put explicitly as a side effect.

Observable
    .error(new Exception("I'm failing"))
    .doOnError(new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            // I'm an explicit side effect
            // use a proper logger of your choice here
            LOGGER.warn("Error while doing XYZ", throwable); 
        }
    })
    .subscribe();

We also recommend to configure your logger to include absolute timestamps. While this is always a good idea, combined with good logging throughout the application, it makes it easier for you to debug error cases and see later what was going on inside your reactive application.

You can also utilize the various other side effect operators for general logging (doOnNext, doOnCompleted). If you don't want to have different side effects for the same logging operation, you can use doOnEach, which will be called for both errors and next events.

Failing

Failing is the easiest way to handle errors - because you don't. While of course most of the time you want more sophisticated error handling strategies (as dicussed later), sometimes you just need to fail. Some errors just do not make sense to be retried (because they are not transient) or you already tried everything to make it work, but it still keeps failing.

In error resilient architectures, you want to do everything to keep the error contained, but if the containment is not able to handle the error it needs to propagate it to a parent component that (maybe) can.

In the async case, errors are events like every other for your subscribers. Once an error happens, your Subscriber is notified in its onError(Throwable) method and you can handle it the way you want to. Note that by Observable contract, after an onError event, no more onNext events will happen.

Observable
    .error(new Exception("I'm failing"))
    .subscribe(new Subscriber<Object>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
            System.err.println("Got Error: " + e);
        }

        @Override
        public void onNext(Object o) {
        }
    });

If your subscriber does not handle the onError and an error happens, you will see an exception in your logs:

Exception in thread "main" rx.exceptions.OnErrorNotImplementedException: I'm failing
    at rx.Observable$31.onError(Observable.java:7204)
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:127)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:96)
    at rx.Observable$ThrowObservable$1.call(Observable.java:9171)
    at rx.Observable$ThrowObservable$1.call(Observable.java:9161)
    at rx.Observable.subscribe(Observable.java:7463)
    at rx.Observable.subscribe(Observable.java:7195)

This indicates that its always a good idea to implement error handling.

In the synchronous case, every error is converted into a Exception and thrown, so you can use regular try/catch semantics.

try {
    Object data = Observable
        .error(new Exception("I'm failing"))
        .toBlocking()
        .single();
} catch(Exception ex) {
    System.err.println("Got Exception: " + ex);
}

If you do not catch the Exception, it will bubble up:

Exception in thread "main" java.lang.RuntimeException: java.lang.Exception: I'm failing
    at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:482)
    at rx.observables.BlockingObservable.single(BlockingObservable.java:349)

Retry

Retrying operations is a common technique to ride over transient errors. It should not be used for non-transient errors, because it will only put load onto the system without the chance to resolve the error.

In practice, the following retry strategies can be applied when a transient error is discovered:

  • Retry immediately
  • Retry with a fixed delay
  • Retry with a liniearly increasing delay
  • Retry with an exponential increasing delay
  • Retry with a random delay

Unless you have a very good reason to not to, always apply a maximum number of tries and then escalate the error. Systems stuck in infinite retry loops can cause issues that are very hard to debug. It's better to fail and propagate at some point.

Also, we recommend that you use asynchronous retry even if you are blocking at the very end. Retrying in the asynchronous Observables is way more resource efficient and also the only sane way to handle multiple operation steps (and bulk operations) under a single timeout (more on that in the "Timeout" section).

We will provide retry handlers for the most common scenarios in a future release so you don't need to implement this on your own. For now, please use the code provided in this documentation (or variations of it).

Retry without delay

Let's get one thing straight right away: immediately retrying is almost never a good idea. Instead of resolving the error more quickly, it will put more pressure onto the retried system and there is a good chance it will make resolving errors harder.

One good reason to do so is if you have a specific operation with a very short timeout that you want to keep retrying for a small, fixed amount of times and if it still does not work, fail fast.

If you have the feeling you need to retry very quickly, you can also apply a very slight increasing delay to at least release some pressure from the target system.

RxJava provides the retry operator to resubscribe to the source Observable immediately once it fails (an error event happens). Three flavours are available:

  • retry(): Instantly retry as long as the source Observable emits an error. We strongly recommend to not use it, instead use:
  • retry(long count): Instantly retry as long as the source Observable emits an error or the max count is reached. If the count is reached, the Observable will not be resubscribed, but the error is propagated down the stream.
  • retry(Func2<Integer, Throwable, Boolean> predicate): Instantly retry as long as the predicate returns true. Arguments to the predicate are the number of tries, as well as the exception type.

Since the predicate method provides the most flexibility, we recommend using that one. So if you only want to handle a specific exception and retry a maximum of MAX_TRIES times, you can do it like this:

Observable
    .error(new CASMismatchException())
    .retry(new Func2<Integer, Throwable, Boolean>() {
        @Override
        public Boolean call(Integer tries, Throwable throwable) {
            return (throwable instanceof CASMismatchException) && tries < MAX_TRIES;
        }
    })
    .subscribe();

Try replacing the CASMismatchException with something else and you will see that it does not try to retry, but rather populates the error downstream. You can use this technique to handle specific errors differently by adding more retry operators in the pipeline.

Using the retry with predicate also allows you to log the number of retries for a specific error. If you use the doOnError for logging in this case it's harder to log the number of retries.

The synchronous equivalent to the latest code looks like this:

int tries = 0;
while(true) {
    tries++;
    try {
        pretendWorkThatMaybeThrows(); // does some work and maybe throws
        break;
    } catch(Throwable throwable) {
        if (!(throwable instanceof CASMismatchException) || tries >= MAX_TRIES) {
            throw throwable; // rethrow exceptions
        }
    }
}

Retry with delay

When applying a retry with delay, the main question you need to ask yourself is: how often and how long is it feasible to retry before giving up (and escalate the error). This highly depends on the type of operation, use case and SLA that the application requires, but the techniques are the same.

RxJava provides the retryWhen operator, which allows you more flexibility on the actions performed as well as when the resubscription is happening. This chapter covers the different delay approaches based on this operator.

Here is the contract for retryWhen that you should always keep in mind:

  1. It is called when an error on the source Observable happens.
  2. The function provided will be called with an Observable containing this error.
  3. If you make this Observable error, it is propagated downstream (without retrying).
  4. If you make this Observable complete, it is propagated downstream (without retrying).
  5. In all other cases, a retry will happen.

The easiest approach is the fixed delay. The source observable will be resubscribed after a specified amount of time and for a fixed maximum number of times.

Because the nested logic is a bit harder to understand in the first place, let's talk through it step by step and then put it together.

Our retryWhen function is called every time an error happens on the source Observable. If we wanted to try forever every second, it can look like this:

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Throwable> errorNotification) {
        return errorNotification.flatMap(new Func1<Throwable, Observable<?>>() {
            @Override
            public Observable<?> call(Throwable throwable) {
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        });
    }
})

We flatMap our notification Observable and utilize the Observable#timer to defer emitting a new event for a second. Now since we need to stop at some point (after a given number of tries), we can utilize the Observable#zipWith operator to zip our error stream together with a range where we specify the number of tries we want to allow. Zipping has the nice side effect that once one of the Observable is completed, the result Observable will also be complete, which triggers our Rule 4 from above.

So a modified version looks like:

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Throwable> errorNotification) {
        return errorNotification
            .zipWith(Observable.range(1, 4), new Func2<Throwable, Integer, Integer>() {
                @Override
                public Integer call(Throwable throwable, Integer attempts) {
                    return attempts;
                }
            })
            .flatMap(new Func1<Integer, Observable<?>>() {
                @Override
                public Observable<?> call(Integer attempts) {
                    return Observable.timer(1, TimeUnit.SECONDS);
                }
            });
    }
}) 

Technically, we don't need the zip function here because we ignore it later on, but it is required for the zipWith operator to work. We use the Observable#range operator to create an Observable that emits 3 events and then completes, so we will never end up with more retries.

There is one more enhancement needed: the code as it stands there will swallow the originating exception when moving on, which is not good because it should be propagated if it can't be handled in this code block.

The following code is modified so that the function of zipWith does not only return the attempt count, but also the throwable, so we have access to it in the flatMap method. For this, the Java client has a generic Tuple we can utilize. In the flatMap we check for the number of attempts and if we are over our threshold we rethrow the exception. Keep in mind that we need to change Observable#range call to MAX_ATTEMPTS+1, to give our code a chance to be called again one final time.

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Throwable> errorNotification) {
        return errorNotification
            .zipWith(Observable.range(1, 5), new Func2<Throwable, Integer, Tuple2<Throwable, Integer>>() {
                @Override
                public Tuple2<Throwable, Integer> call(Throwable throwable, Integer attempts) {
                    return Tuple.create(throwable, attempts);
                }
            })
            .flatMap(new Func1<Tuple2<Throwable, Integer>, Observable<?>>() {
                @Override
                public Observable<?> call(Tuple2<Throwable, Integer> attempt) {
                    if (attempt.value2() == 3) {
                        return Observable.error(attempt.value1());
                    }
                    return Observable.timer(1, TimeUnit.SECONDS);
                }
            });
    }
})

If you want to enhance it even further, you can add one more if() clause in the flatMap to see if the throwable that is passed down is actually one we want to retry.

Functionality like this is a great candidate to be generic and encapsulated, so we are working on providing similar functionaly out of the box. If you are using Java 8 already, the code becomes more condensed as well:

.retryWhen(notification ->
    notification
    .zipWith(Observable.range(1, 5), Tuple::create)
    .flatMap(att ->
            att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(1, TimeUnit.SECONDS)
    )
)

Here are the variations for linear, exponential and random delays:

Linear:

// Utilizes the number of attempts for the number of seconds to wait
.retryWhen(notification ->
    notification
    .zipWith(Observable.range(1, 5), Tuple::create)
    .flatMap(att ->
            att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(att.value2(), TimeUnit.SECONDS)
    )
)

Exponential:

// Uses the timer with 2^attempts to generate exponential delays
.retryWhen(notification ->
    notification
    .zipWith(Observable.range(1, 5), Tuple::create)
    .flatMap(att ->
            att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(1 << att.value2(), TimeUnit.SECONDS)
    )
)

Random:

// Random between 0 and 5 seconds to retry per attempt
.retryWhen(notification ->
    notification
    .zipWith(Observable.range(1, 5), Tuple::create)
    .flatMap(att ->
            att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(new Random().nextInt(5), TimeUnit.SECONDS)
    )
)

With synchronous code, there are not many options other than using Thread.sleep() to keep the current thread waiting until the loop is allowed to proceed:

// Linear Backoff
int tries = 0;
while(true) {
    tries++;
    try {
        pretendWorkThatMaybeThrows(); // does some work and maybe throws
        break;
    } catch(Throwable throwable) {
        if (!(throwable instanceof CASMismatchException) || tries >= MAX_TRIES) {
            throw throwable; // rethrow exceptions
        }
    }

    Thread.sleep(TimeUnit.SECONDS.toMillis(tries));
}

You can then use the same approaches as with the asynchronous ones on the Thread.sleep() time to accomodate for a static, linear, exponential or random delay.

Fallback

Instead of (or in addition to) retrying, another valid option is falling back to either a different Observable or a default value.

RxJava provides you with different operators, prefixed with onError*():

  • onErrorReturn(Func1<Throwable, T>): Called when the source Observable errors and allows to return custom data instead.
  • onErrorResumeNext(Observable<?>): Called when the source Observable errors and allows to resume transparently with a different Observable.
  • onErrorResumeNext(Func1<Throwable, Observable<?>): Called when the source Observable errors and allows to transparently resume with an Observable (based on a specific Throwable).

You should use the onErrorReturn if you want to fallback to static data quickly. For example:

Observable
   .<String>error(new Exception("I failed"))
   .onErrorReturn(new Func1<Throwable, String>() {
   	@Override
       public String call(Throwable throwable) {
           // You could return data based on the throwable as well
           return "Default";
       }
   })
   .subscribe();

If you only want to return default values based on a specific exception or even call another Observable as fallback, onErrorResumeNext is for you.

Observable
    .<String>error(new TimeoutException("I failed"))
    .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
        @Override
        public Observable<? extends String> call(Throwable throwable) {
            if (throwable instanceof TimeoutException) {
                return Observable.just("Default");
            }
            // Forward anything other than the TimeoutException
            return Observable.error(throwable);
        }
    })
    .subscribe();

If you just want to fallback onto another Observable that you have in scope without caring about the Exception, you can use the other onErrorResumeNext() overload. For example, this loads data from all replicas if the get() call did not succeed with the Java SDK:

bucket
    .async()
    .get("id")
    .onErrorResumeNext(bucket.async().getFromReplica("id", ReplicaMode.ALL))
    .subscribe();

Synchronous fallbacks can be implemented by conditionally setting the default in the catch clause:

String value;
try {
    value = pretendWorkThatMaybeThrows();
} catch(Exception ex) {
    value = "Default";
}

Here is the gotcha though: this only works great if the fallback is static. If you need to fallback into another database call for example, you quickly get into nested error handling and timeouts are a pain to handle (since they start to accumulate for every synchronous call). We recommend you to use asynchronous fallbacks and then block at the very end through toBlocking().single() or equivalents.

Defaults

Another possibility that requires intervention on the application side are Observables that do not emit a single value. This can be because operators filtered the Observable so that nothing is left, or they did not produce any values in the first place. One common case in the Java SDK is get() anf its cousins. If the Document is not found, the Observable will complete without emitting anything.

RxJava provides helper operators which all end with *OrDefault() and allow you to return default values if no item is emitted when the Observable completes.

In most cases you want to use singleOrDefault() and return a default value when not a single item is emitted by the source Observable:

Observable
   .<String>empty()
   .singleOrDefault("Default")
   .subscribe();

If you are dealing with potentially more than one item emitted in your Observable and you only want to emit either the first or the last value, there are also operators that allow you to emit a default if it's unexpectedly empty. See firstOrDefault() as well as lastOrDefault() for more details.

Error Handling in Bulk Scenarios

Bulk operations are used to handle more data in one batch (and therefore benefit from better resource utilization), but error handling becomes more complicated. There are three high level cases to consider:

  • "Best Effort": keep going and just use the results that succeeded when errors happen.
  • "Full Retry": Retry the complete Observable when errors happen.
  • "Incremental Retry": Only retry specific events of the Observable.

Before we dig into the specific approaches, let's revisit the contract of Observables:

onEvent* (onError | onComplete)

Zero or more events are followed by either an error event or a complete event. This provides an important clue right away: once our observable fails, no more events can be passed through. Therefore, you need to make sure that errors are handled at the smallest scope possible, only letting it proceed if you cannot handle it right away.

Best effort bulk handling

Sometimes it is more important to get data in a timeframe (maybe with a short timeout) then getting all data. Of course in general you want to keep a timeout that lets you fetch all the data, but depending on the use case you are fine with only getting a subset of the data returned.

To ignore errors and turn them into "noops", you can utilize onErrorResumeNext():

Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("A");
            subscriber.onNext("B");
            subscriber.onError(new IllegalStateException("Woops"));
            subscriber.onNext("C");
        }
    })
    .onErrorResumeNext(Observable.<String>empty())
    .toBlocking()
    .forEach(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println("Got: " + s);
        }
    });

This will not raise any exception to your calling thread, but it will never process "C", because based on the Observable contract, once onError is called, no more events are allowed to be generated.

In order to keep going in case events fail, you need to turn each event into a single Observable and then merge them back together to either defer the error handling (through Observable#mergeDelayError) or use flatMap and make sure to contain the errors before flattening.

This code provides some fake data you can work with. It will emit 4 observables where one of them will fail:

Observable<Observable<String>> dataObservables = Observable
    .just("a", "b", "c", "d")
    .map(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            if (s.equals("b")) {
                return Observable.error(new IllegalStateException("I dont like b"));
            }
            return Observable.just(s);
        }
    });

You can then use mergeDelayError to defer error handling until the end. If you uncomment the onErrorResumeNext it will silently discard any errors as well, leaving you with an Observable that provides best-effort processing of source Observables.

Observable.mergeDelayError(dataObservables)
    //.onErrorResumeNext(Observable.<String>empty())
    .toBlocking()
    .forEach(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println(s);
        }
    });

Alternatively, you can use flatMap and make sure the errors are contained for each emitted Observable:

Observable
    .just("a", "b", "c", "d")
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            // Simulate some observable that sometimes fails
            Observable<String> obs;
            if (s.equals("b")) {
                obs = Observable.error(new IllegalStateException("I dont like b"));
            } else {
                obs = Observable.just(s);
            }

            // Here comes the actual handling part before we flatten it back
            return obs.onErrorResumeNext(Observable.<String>empty());
        }
    })
    .toBlocking()
    .forEach(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println(s);
        }
    });

As a more practical example, here is a best effort bulk loading of documents from Couchbase Server that just discards operations that fail:

private static Observable<JsonDocument> loadDocsBestEffort(Bucket bucket, List<String> ids) {
    return Observable
        .from(ids)
        .flatMap(new Func1<String, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(String id) {
                return bucket
                    .async()
                    .get(id)
                    .onErrorResumeNext(Observable.<JsonDocument>empty());
            }
        });
}

Of course, you can add more logic onto each emitted Observable and also proper logging so that you at least know which errors happened.

Full retry bulk handling

TODO

Incremental retry bulk handling

**TODO

Error Mitigation

Slowing Down

  • batching ops together with operators

Load Shedding

  • semaphores

Error Causes

This chapter discusses errors that need to be covered by the application developer to make the application resilient.

Timeouts

  • worst nightmare and safety net at the same time.
  • so many possibilities for timeouts
  • how to apply timeouts to async workflows
  • how timeouts are used in the sync wrappers
  • difference between global timeout and single-timeout ops

Backpressure

  • when can it happen and what can you do agaisnt it?

Operation Effects

  • how to handle erros that come because of insers, locks, cas mismatches,...

Unstable Cluster State

  • which errors can come up during rebalance, failover or even a failing node (before failover) and how to handle that
@tom-cb
Copy link

tom-cb commented Dec 3, 2014

@benjchristensen I'd actually specifically asked if we could get Java 7 examples, as there's still a lot of people who want to use this that aren't quite ready for Java 8 yet. Also, although the Java 8 style is much more compact and easy to read, I think it can be a little difficult to get what is going on if it's your first exposure to it -- learning both Rx style and Java 8 style in one go.

Maybe the best thing here would be if we can get buttons that let you switch between java 7 and java 8 examples. Would that make sense, or would the surrounding text have to be too generic for that to work in practice? (maybe that's exactly what you were suggesting anyway??)

@ivansanchezvera
Copy link

Just what I needed, error handling is key for my application. I am new to Rx, we are trying couchbase with the java client. This document is great, should be indexed in the couchbase documentation site as soon as possible. Great job @daschl

@daschl
Copy link
Author

daschl commented Dec 19, 2014

thanks! it will be soon :)

@mcpaddy
Copy link

mcpaddy commented Feb 26, 2015

Typo in the 1st sentence under Best effort bulk handling section:

Sometimes it is more important to get data in a timeframe (maybe with a short timeout) then getting all data.

then should be than.

@gfrison
Copy link

gfrison commented Jul 2, 2015

Thanks for the useful document.
How can I retry (retrywhen with interval period) only on specific exceptions?

Thank you

@AlexBonel
Copy link

We use the Observable#range operator to create an Observable that emits 3 events and then completes, so we will never end up with more retries.

Looks like 4 events in this case Observable.range(1, 4) -- n...n+m-1 according to [docs](http://reactivex.io/RxJava/javadoc/rx/Observable.html#range%28int, int%29). Am I right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment