Skip to content

Instantly share code, notes, and snippets.

@daschl
Last active August 29, 2015 14:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daschl/ff3e19ec67ccce9cbb6f to your computer and use it in GitHub Desktop.
Save daschl/ff3e19ec67ccce9cbb6f to your computer and use it in GitHub Desktop.

Durability Requirements

Introduction

If you do not specify any durability requirements (and we'll see how in a bit), the server will respond with a success message if the data has been acknowledged and processed in the managed cache. This means that because persistence and replicare are asynchronous tasks and happen eventually, there is a timegap where a node failure can lead to data loss. This time gap is exactly between the data has neither been replicated to another node nor persisted to disk.

In general, for most use cases this time gap is totally acceptable (since it is usually very small), but for the important mutation operations we need stronger guarantees. Changing the server to some more magic (like only aknowledging once it has been replicated or persisted) by default makes the whole system slower, so Couchbase has another way to handle those situations. Here is how it works:

  1. The SDK will perform the normal mutation without durability requirements.
  2. If the server returns with a successful response, the client will start polling.
  3. The client polls all the affected (more in a bit on this) nodes for this mutation until either the desired state is reached or it can't be reached for a reason.
  4. In the successful case the operation will also complete towards the application layer, and in the failure case the client will error out the operation, leaving the user to decide whats next.

The following code will make sure that the document has been persisted on the active node for this document ID and also replicated to one of the configured replicas:

JsonDocument docPersistedAndReplicated = bucket.upsert(docToStore, PersistTo.MASTER, ReplicateTo.ONE);

In this example the client will poll two nodes until completion: the active node for this document and also the configured replica. If any of the constraints are not fullfillable, an exception will be raised. We'll cover the specific error scenarios later in this chapter.

In case you wonder why in the failure case we put the burden on you to figure out what next: the SDK has no idea of your SLAs or intents to what do to when the operation fails. Sometimes it might be fine to proceed and log the error, in other cases you may want sophisticated retry mechamisms where the SDK can guide you with functionality, but not the actual execution semantics.

PersistTo and ReplicateTo

The way to specify your intent to define durability requirements is by passing in PersistTo and/or ReplicateTo enums as params on mutation operations.

The following states can be set, and all of the combinations between the two are supported. Note that the NONE states are the defaults, and in this case this durability requirement won't be used. The NONE state is useful nonetheless if you define it based on a system property and need a fallback to the default.

  • PersistTo: NONE, MASTER, ONE, TWO, THREE, FOUR
  • ReplicateTo: NONE, ONE, TWO, THREE

Since you can have a maximum of three replicas, the maximum number of replicas you can check is three as well. For persisting the maximum number is four, becasue the active node can be checked in addition to the three replicas.

All options (other than NONE and MASTER) will return as soon as the requirement is fullfilled. So for example if you have two replicas configured on the bucket and you specify ReplicateTo.ONE, as soon as either one of the two reports success for the document ID the condition is assumed to be fulfilled.

If you provide both PersistTo and ReplicateTo both conditions are linked together with a logical "and", meaning that both conditions must be satisifed in order for the SDK to report success.

Keep in mind that if the durability requirement failed, it could very well be that the original mutation operation succeeded. See the common failure scenarios on more context. This is due to the effect that the actual mutation and the subsequent polling are individual operations which can also fail individually. As an example, if you specify ReplicateTo.ONE and you have no replica configured, the original mutation will complete without issues, but the durability requirement will fail.

API Support

The following mutation operations provide support for durability requirements:

  • insert
  • upsert
  • replace
  • remove

Here is an example which inserts a document and waits until it is persisted on the active node:

bucket.insert(document, PersistTo.MASTER);

The following example replaces a document and waits until it is replicated to at least one replica:

bucket.replace(document, ReplicateTo.ONE);

While not obvious in the first place, the same holds true for removal operations. If you want to make sure that the deletion of the document survives node failures under any conditions, you can use the same semantics. The folling example makes sure that the removal is persisted on at least two nodes in the cluster (like the master and one replica):

bucket.remove("docid", PersistTo.TWO);

In this last example also specifying ReplicateTo.ONE would be redundant, since persisting on a replica means it first needs to be replicated to it.

Exceptions and Errors

The asynchronous API will send error notifications to your subscriber in the failure case. Since the synchronous API just wraps the async one, those errors will be converted into actual exceptions.

Every exception that happens during the durability requirement polling will be wrapped into a com.couchbase.client.java.error.DurabilityException. The original exception is carried as part of the DurabilityException, so you can always call Throwable#getCause() on it.

Once you do that, you will come across the following errors (more on their root causes in the next sections):

  • DocumentConcurrentlyModifiedException: If the observed document has been concurrently modified by another caller.
  • ReplicaNotConfiguredException: There are not enough replicas configured to fulfill the durability requirement in the first place.
  • ReplicaNotAvailableException: There are currently not enough replicas in the cluster to fulfill the durability requirement right now.
  • DocumentMutationLostException: The mutation has been lost during a hard failover. Only applies to enhanced durability.

Note that these errors can happen for a subset of the documents, depending on which node has been failed over and no replica is available at this point. Other errors will affect all documents, for example if no replica is configured on the bucket at all.

Keep in mind that because the polling for durability happens after the original mutation, every failure on that one will propagate immediately and never trigger the polling in the first place. This means that if you specify durability requirements, you need to handle those errors in addition to the errors on the original mutation.

Failure Modes and their Impact

An advanced feature of the SDK are different failure modes, but in the context of durability requirements they become more important. By default, the SDK uses the BestEffortRetryStrategy, but you can plug in a different one (commonly the FailFastRetryStrategy) or even write your own. If you want to change it, you can do it on the environment like this:

CouchbaseEnvironment env = DefaultCouchbaseEnvironment
    .builder()
    .retryStrategy(FailFastRetryStrategy.INSTANCE)
    .build();

Keep in mind that since this is a global setting, all operations will be affected by it. Now, what does it change in the context of durability requirements?

If the SDK detects an issue during the polling activity it will either continue to retry (best effort), or bail out immediately (fail fast). So if a node goes down (see below for more info) and does not come back up quickly enough with best effort you'll hit the client side timeout specified, while with fail fast you'll very quickly get a DurabilityException which contains the root cause, for example a RequestCancelledException.

So if you use the fail fast retry strategy, you are trading more complex retry code on your side for faster feedback cycles in the failure case. A common reason to enable fail fast is if you use some more sophisticated libraries on top of the SDK, for example Hystrix.

Common Failure Scenarios

Because durability constraints in general span more than one node, the statistical chance for operation failures is higher. As a result, it is even more important for production stability to consider what can go wrong and in what ways to react.

This chapter focusses on failurs which come up because of invalid cluster setup while the next chapter covers node failures (like a node failing because of a power outage or hardware failure).

Lots of durability requirement failures happen because the bucket doesn't have enough replicas configured in the first place or the number of nodes is not sufficient enough (for example 2 replicas configured, but only 2 nodes in the cluster).

So if you have one replica configured on the bucket but you issue a mutation with ReplicateTo.TWO, you'll get the following error:

Exception in thread "main" com.couchbase.client.java.error.DurabilityException: Durability requirement failed: Not enough replicas configured on the bucket.
	at com.couchbase.client.java.CouchbaseAsyncBucket$18$1.call(CouchbaseAsyncBucket.java:549)
	at com.couchbase.client.java.CouchbaseAsyncBucket$18$1.call(CouchbaseAsyncBucket.java:545)
	at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:99)
	...
Caused by: com.couchbase.client.core.ReplicaNotConfiguredException: Not enough replicas configured on the bucket.
	at com.couchbase.client.core.message.observe.ObserveViaCAS$6$2.call(ObserveViaCAS.java:144)
	at com.couchbase.client.core.message.observe.ObserveViaCAS$6$2.call(ObserveViaCAS.java:136)
	at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)

When you have a replica configured on the bucket, but not enough nodes in the cluster to actually replicate the data, the behaviour varies on the retry strategy. By default (best effort), you'll get a timeout, because the client tries as long as possible (since the node can be added to the cluster at runtime):

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.TimeoutException
	at com.couchbase.client.java.util.Blocking.blockForSingle(Blocking.java:75)
	at com.couchbase.client.java.CouchbaseBucket.upsert(CouchbaseBucket.java:375)
	at com.couchbase.client.java.CouchbaseBucket.upsert(CouchbaseBucket.java:370)
	at test.MainTest.main(MainTest.java:30)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.util.concurrent.TimeoutException
	... 9 more

If you choose fail fast, the error will be a DurabilityException which contains a ReplicaNotAvailableException:

Exception in thread "main" com.couchbase.client.java.error.DurabilityException: Durability requirement failed: Replica number 1 not available for bucket default
	at com.couchbase.client.java.CouchbaseAsyncBucket$18$1.call(CouchbaseAsyncBucket.java:549)
	at com.couchbase.client.java.CouchbaseAsyncBucket$18$1.call(CouchbaseAsyncBucket.java:545)
	at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:99)
	at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
	at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
	at rx.internal.operators.OperatorTake$1.onError(OperatorTake.java:62)
	...
Caused by: com.couchbase.client.core.ReplicaNotAvailableException: Replica number 1 not available for bucket default
	at com.couchbase.client.core.node.locate.KeyValueLocator.errorObservables(KeyValueLocator.java:202)
	at com.couchbase.client.core.node.locate.KeyValueLocator.locateForCouchbaseBucket(KeyValueLocator.java:127)
	at com.couchbase.client.core.node.locate.KeyValueLocator.locate(KeyValueLocator.java:79)
	...

Note the subtle difference between a ReplicaNotConfiugredException and a ReplicaNotAvailableException. This lets you quite effectively troubleshoot your cluster if it is not setup properly or if it is suffering from a temporary failure condition.

Understanding and Handling Node Failures

In addition to wrong cluster setup, the most common issue with durability requirements is if at runtime the cluster topology changes in a way that makes it not possible to fulfill the requirements anymore.

In practice, this is commonly refered to as a "node failure". This can happen because the couchbase-server service is restarted, the node is powered down or it is not reachable across the network. In all cases operations will start to fail, either because the active partitions do not respond (the normal mutation fails) or the replicas don't (the observe for the durability requirement fails).

In this state, the outcome is nearly the same as in the chapter above. If you use the best effort strategy you will run into your operation timeouts, in the fail fast case you'll see the DurabilityExceptions with the different underlying causes.

The difference is though that only a subset of the operations will fail, since if you have a 5 node cluster only 1/5th of the mutations will fail and the durability requirement will only fail if the node down involved is the only one who can help fulfilling. What that means in practice is that if you have 2 replicas configured, one node is down and you use ReplicateTo.ONE, the other replica can still help make the requirement succeed.

Once you manually trigger the node failover or the auto-failover feature kicks in, the replicas are promoted to active and the regular mutations on those partitions will start to work again. Here is the gotcha: since replicas have been promoted to active a few seconds ago, those replicas are now gone.

So you either have more than one replica configured (in this case there is a spare one sitting in the cluster answering the durability requirement requests) or you'll still get ReplicaNotAvailableExceptions.

Now once you hit rebalance and the cluster manager makes sure enough replicas are in place again and the data is shuffled around, at some point the ReplicaNotAvailableExceptions are gone and everything is good again.

In terms of what to do when you hit such an exception very much depends on the type of application. If you need to make sure the data is stored and you don't mind waiting for the cluster to settle again you can retry the operation with an exponential backoff. If you are running an interactive application, most of the time you need to propagate the error up the stack or fail in a controlled way.

While you should definitely plan for node failures, in general they don't happen every hour and if you plan accordingly you can minimize production impact. Strategies like multiple replicas, using the EE rack awareness feature and since 4.0 the transparent enhanced durability (see below) provide a strong foundation to build always-on services, even in the failure case.

Handling Concurrent Document Modifications

One error you might encounter in a concurrent application, even if all nodes are up and running, is the DocumentConcurrentlyModifiedException. To understand its roots, we need to peel back a layer and see how the polling mechanism works.

A successful mutation returns the new CAS value as part of the response. Because the CAS value changes all the time when the document changes, we can use it to track the replication and persistence on the server side. So here is roughly what happens inside the SDK.

  1. The mutation is performed.
  2. If successful, the document ID and sent to each participating server through the internal observe command.
  3. All responses are collected which contain a status (persisted, replicated) and the CAS of the document on each server.
  4. If on the master (active) node the CAS value is different from the one of our mutation a DocumentConcurrentlyModifiedException is raised.

Why?

Imagine application A mutates document D and gets a CAS returned. Then it starts polling. Slightly afterwards application B also mutates document D and the CAS value changes again. Because the managed cache on the server performs deduplication or the document can be replicated between poll cycles, the CAS returned to application A might never be observed since it already changed to the CAS from application B.

In order to detect this scenario, once the CAS on the master changes the SDK will raise a DocumentConcurrentlyModifiedException. How to react depends on the nature of the application. If from the semantics the latter operation subsumes the previous one, ignoring the error might be acceptable. If not, application A needs to fetch the document again, perform its changes or raise an error. In general, error handling is very similar to a CASMismatchException.

The next section describes the "next generation observe" which removes the possibility of a DocumentConcurrentlyModifiedException happening.

Enhanced Durability Requirements with 4.0+

Couchbase Server 4.0 introduces a new feature which allows the SDK to be more accurate during the observe poll cycle, especially in the concurrent and failover cases. Instead of using the CAS to verify mutations, it uses sequence numbers and partition UUIDs.

To enable them, all you need to do is enable mutation tokens on the environment:

CouchbaseEnvironment env = DefaultCouchbaseEnvironment
    .builder()
    .mutationTokensEnabled(true)
    .build();

Note that the tradeoff here is an extra 16 byte overhead on every mutation. Every mutation returns the partition UUID and the sequence number which are then used for the enhanced durability requirements.

As a result, the new MutationToken on the Document will be set and as a result the polling logic will automatically fall back to the enhanced requirements. Only enable enhanced durability on a minimum node version of 4.0, since the SDK will not check if each individual node supports the new observe option.

Every mutation on the server side increases the sequence number. And because the SDK uses sequence numbers instead of the CAS value, in the example above even if application A and B update document D (say sequence 1 and 2 on the document), if we observe sequence 2 we can be sure sequence 1 has also been replicated or persisted.

If a hard failover happens, a new partition UUID is created and the server will return with a different response. From this response the SDK can reliably infer if a mutation has been lost (the replica took over but the last replicated sequence did not include the mutation we have been polling for). In this case, a DocumentMutationLostException will be raised. In general it is recommended that the application re-performs the operation if this exception is encountered to avoid data loss.

Performance Considerations

Couchbase is widely recognized for its excellent and predictable performance. One of the reasons for that is its managed cache, which allows it to return a response very quickly and not having to take replication or persistence latency into account.

Again, it's all tradeoffs. If you need to make sure data is replicated and/or persisted your network or disk performance will be the dominant factor. If you need high throughput and durability requirements, make sure (and measure) to have fast disks (SSD) and/or fast network.

Because more than one node is in general involved and more rountrips are needed, think about realistic timeouts you want to set and measure them in production. If you hit performance issues in production, make use of the new built-in metrics feature to gather operation latency percentile information and adjust timeouts based on those measurements. All timeouts you set on the blocking API need to take the original mutation and all subsequent polls into account until the durability requirement is met.

There is one final know you can tune on the environment: the observeIntervalDelay. It allows you to tune the delays between subsequent polls. By default, an exponential delay between 10 microseconds and 100 milliseconds is used. That way there will be a few very quick polls followed by some with a longer pause, the ceiling at 100ms (so it does not grow exponentially out of bounds). The following example fixes it at 50 microseconds:

CouchbaseEnvironment env = DefaultCouchbaseEnvironment
    .builder()
    .observeIntervalDelay(Delay.fixed(50, TimeUnit.MICROSECONDS))
    .build();

You want to tune the delay according to your network and disk performance characteristics. Too frequent polls just overload the network unnecessarily while too high poll delays will increase the latency for the overall operation (and subsequently reduce application throughput). If you are uncertain, work with Couchbase Support to find the optimal settings for your environment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment