Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Whirl Wind Tour of BigCouch Sources

Whirl Wind Tour of BigCouch Sources

This document aims to give a brief tour of the BigCouch source tree as well as some background on some of the included applications. This is a fairly high level over view to try and get CouchDB developers familiar with the code to at least know where to start looking when they wish to start working on clustered code.

This is mostly aimed at developers that are already familiar with CouchDB and its various internals as an on-boarding document for the newly added clustering layer. As such most of the current code is elided from this discussion.

Clustering Basics

A BigCouch cluster is a fairly straightforward implementation of consistent hashing which relies on CouchDB's existing replication semantics for the "hard parts" of clustering (ie, anti-entropy).

In BigCouch a "database" is actually a collection of many "CouchDB databases". To make the distinction we refer to the underlying "CouchDB databases" as shards when talking about a clustered database.

A clustered database has two main parameters when its created, N - the number of copies of each shard range, and Q - the number of shard ranges. A clustered database will then have N * Q shards on disk. Its important to note that N is limited by the number of nodes in the cluster as there's not much sense in creating two copies on the same node. By default N is 3 and Q is usually 4 or 8. For databases that are expected to grow significantly a larger Q should be chosen. Changing Q is hard. Changing N is not so hard but should always be at least 3.

When a user makes a read or write request they provide what's called the quorum parameter. This is named either r or w depending on whether its a read or write. This value determines how many responses will be considered before returning to the client. Given N = 3, r and w can be the values 1, 2, or 3. The default for both r and w is 2 for normal document operations.

Views on the other hand are always the equvialent of an r=1 request as there's no direct method to resolve differences between all combinations of shards that produce any given view response.

Core Clustering Apps

There are four core applications added to facilitate the clustering layer:

  • rexi - An RPC framework similar to rex
  • mem3 - Maintains cluster state information
  • fabric - The clustered API
  • chttpd - An HTTP interface to fabric


The rexi application is responsible for the RPC layer inside a BigCouch cluster. This application is responsible for two main tasks. First, it is the "target" of most RPC calls in that it performs the actual process spawn and reporting of errors. Second it implements a buffering in outgoing RPC requests.

RPC requests come in two flavors, a strict request/response approach to RPC as well as a request/streamed-response flavor. The request/response is used everywhere except for views, all docs, and changes feeds. The functions here are rexi:cast/2,3,4 and rexi:reply/1.

The streaming API uses the rexi:cast/2,3,4 and remote workers respond using the rexi:stream2/1,2,3 and rexi:stream_last/1,2 calls. The stream functions allow for a configurable number of messages to be inflight. Streaming will pause when this limit is reached until it receives ACK's from the coordinator. Its a very basic form of TCP flow control.

Most of the interesting code for rexi is in:

  • rexi.erl - The main client interface
  • rexi_server.erl - The server that spawns and tracks remote workers
  • rexi_buffer.erl - Buffers for RPC requests
  • rexi_utils.erl - Specifically, rexi_utils:recv/6 for receiving RPC messages


The mem3 application is responsible for storing and managing "cluster state". This is where sharding and hashing are implemented. Its responsible for managing information on each clustered database as well as deciding which shards documents belong to.

mem3 uses two "node local" databases to store its information. These are by default nodes and dbs. The nodes database stores a list of the nodes in the cluster along with a few bits of metadata for each node. Generally speaking this is a very small database.

The dbs db stores all of the information on clustered databases. Each document represents a single database. To determine if a clustered database exists we check to see if there's an undeleted document in this databse (These documents are heavily cached in a parsed form I'll discuss shortly).

Both of the nodes and dbs databases are replicated through out the cluster in a ring based on node name ordering.

One of the other major repsonsibilities of mem3 is in its active anti-entropy via internal replication. As database shards are updated they are enqueued by mem3_sync to wait on internal replication. As the name implies this is just an internal replicator that copies changes between shards. In normal operation each replication should be roughly a no-op except for some revision checks. Under high load shards can diverge due to errors and slow nodes not receiving writes. This is one of two methods for fixing the divergence issue.

The mem3_shards module is one that most people will also be interested in. While the main API is handled in mem3.erl this is the implementation for loading, parsing, and cacheing database information. This is stored as a series of #shard{} records that are used extensively throughout the cluster layer.

Important modules in mem3:

  • mem3.erl - Main user API
  • mem3_nodes.erl - Manages the list of nodes in the database
  • mem3_rep.erl - The internal replicator
  • mem3_sync.erl - The active anti-entropy module
  • mem3_shards.erl - The clustered database cache

There's also a mem3_rebalance.erl module included that has a bunch of logic for generating shard moves. Its important to note that it doesn't do any of the actual shard moves itself, its just generating the list of things to move. There's work going on to get the external tooling for these moves packaged into a distributable format.


The fabric application is the cluster API application. Its a pure library application that has no permanent processes or supervision tree. The core API is contained in fabric.erl which makes calls to the various other modules in the application.

In general, fabric.erl will call fabrc_function_name:go/N. Each of these "function modules" then will trigger a number of RPC messages and then handles the responses. The remote side of the RPC call is defined in the fabric_rpc.erl module. To read this code its generally easiest to read through the remote side of the interaction to get an idea of what messages its responding with and then read the coordinator to see how those messages are handled.

The other form of anti-entropy is in Fabric's document read functions. This is a passive anti-entropy method called read-repair. People familiar with the Dynamo paper will have heard of this more standard anti-entropy method.

Basically, when we perform a quorum read and detect differences in the responses from each worker we will re-write the document back so that the nodes will be updated for the document in question.

There is a bit of a subtlety here in that we can use the revision history information to know which document is a "winner" and then return a response based on that knolwedge while performing the read-repair asynchronously.


The chttpd application is a very direct clone of the various couch_httpd* modules except that they've been updated to use fabric instead of calling directly into other couch_* modules.

Support Applications

  • b64url - URL compatible Base64 encoding/decoding in a NIF
  • cassim - Security properties storage
  • config - INI file based configuration (couch_config.erl as an app)
  • couch - All of the couch_* modules from the old src/couchdb
  • couch_event - Event notifications (replaces couch_db_update_notifier)
  • couch_log - Thin front-end to lager
  • ddoc_cache - Design document cache
  • ets_lru - An LRU based on ETS. Another library application
  • global_changes - Clustered version of _db_updates that supports resume
  • khash - A hashmap NIF used by couch_event


This is a pretty simple NIF that is just responsible for encoding and decuding Base46 URL values. The sequence format that BigCouch uses can get a bit heavy so we spent some time optimizing the encoding/decoding of these values.


The security properties for databases aren't governed by MVCC so the ability to do anti-entropy on these values is near impossible. There's some quorum based attempts in mem3_sync_security that covers most of the easily correctable errors but its not perfect. Cassim exists to store security information for clustered databases in a clustered database. This information is then heavily cached for reference at runtime.

Currently the backing database for this app is called cassim but we should probably change that to _security and we'll want to add "before doc update" and "after doc read" functions.


This is just an extraction of the configuration code from CouchDB. For the other applications to use the config system they need to be able to depend on it so that it starts first. We had a lot of bugs with nodes coming up with a default config randomly because some applications started before couch was up and had read its config.

There are a few minor extensions to the API so that we can have type enforcement. There's also an additional "reason" parameter that gets logged when a config change is made. This can be useful for operations to note why something was changed.


This is a replacement of couch_db_update_notifier. In a nutshell, it switches away from using a gen_event behavior to a tree of hash maps and message passing. There's a number of reasons for this, but basically the gen_event model is inappropriate when you can have an unbounded number of event handlers. Given that each changes feed adds a handler this leads to a significant bottleneck in the datbase node under high changes load.

Adam Kocoloski gave a talk somewhere with some good background on this particular change. I'll see if I can't find the slides and link them on the mailing list.


This is just a single file library application that wraps the lager API. The intention is so that if people want to be able to switch the logging implementation they have a single place to do that.


So it turns out that some people like to put a significant amount of JavaScript into their design documents. We've had a few customers with design documents in the multi-megabyte range. Given that we need to load these design documents for lots of different requests it was a no brainer to add a cache here. Somewhere there's a very pretty picture of a cluster's internal traffic going from many meg's a second to a few kilobytes as this was flipped on.

The caching strategy here is a simple TTL based approach. We've added a new design document event message during the merge (rather, CouchDB had added it, ddoc_cache can now use it) which should allow us to make this caching strategy quite a bit better. We've also had a few discussions internally on how to implement the caching hot spots a bit better to reduce ets load. I'll try and distill all of that into a JIRA issue soon.


We had the "LRU on ETS" pattern in a few places so I've extracted it. We could probably stand to update a few of the CouchDB internals to use this in a few places as well. Views are a good example of where this would do some good.

Although ETS can be a bit of a bottleneck under load. I've contemplated rewriting this a number of times as a NIF which can turn this into a data structure with O(1) for get/push/pop operations. Haven't gotten to it though mainly due to couch_server not getting any benefit. Though if we change our ref counting strategy it might work there as well.


This application is the backing for the clustered _db_udpates API. It was designed to mimic the current _db_updates API as closely as possible. The main difference is that global_changes supports the ability to resume notifications. The subtlety here is that you can't know how many updates of each type occurred. Ie, if you were to listen to _db_updates for awhile and then disconnect, upon reconnecting you might learn that database X was updated, but there is no information as to how many times it has been updated. Its the client's responsibility to investigate the changes on each database directly.

This app has a very subtle reliance on MVCC that I should point out. The way that this works without breaking everything is that it relies on identical updates to produce identical revisions. If it weren't for that then there would either be a huge number of conflicts or there would have to be a central coordinator for writes. Its important to note that this reliance means we can't include a timestamp or count or any other information on the update.

The other fun thing that this relies on is how the changes feed only stores one entry per document. This way as we record updates we're only storing a single entry for every (dbname, type) pair. Thus the maximum size of this database (in number of documents) is currently 3 * $num_docs_in_dbs_db because we record the three db events (created, deleted, updated).


This is a small NIF wrapper around the kazlib hash map. Currently its only used by couch_event. I went with the NIF wrapper instead of ETS because couch_event needed to store an unbounded tree of mappings.

Misc Dependencies

  • goldrush - A lager dependency
  • lager - Logging library for Erlang
  • meck - Mocking for Erlang used in Eunit tests
  • jiffy - JSON decoder/encoder

This comment has been minimized.

Copy link

@cararemixed cararemixed commented Aug 15, 2014

A side note on ets_lru, the bottle neck is more so in the gen_server process rather than ETS itself. Once the merge is stable, I might take a stab at replacing it with something that handles load spikes better. There are some other drawbacks with ets_lru that I can discuss at length, perhaps on IRC.


This comment has been minimized.

Copy link

@wohali wohali commented Aug 15, 2014

Glad to see you hanging around, @strmpnk. It'd be best if you can drop that in an email to once you've got your ideas written down, so we have a record of it. IRC isn't logged (intentionally) and can cause some people to feel left out... :)


This comment has been minimized.

Copy link

@andywenk andywenk commented Aug 20, 2014

small typo here?:

"In general, fabric.erl will call fabrc_function_name:go/N."

shouldn't it read

"... call fabric_function_name:go/N."


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.