This document aims to give a brief tour of the BigCouch source tree as well as some background on some of the included applications. This is a fairly high level over view to try and get CouchDB developers familiar with the code to at least know where to start looking when they wish to start working on clustered code.
This is mostly aimed at developers that are already familiar with CouchDB and its various internals as an on-boarding document for the newly added clustering layer. As such most of the current code is elided from this discussion.
A BigCouch cluster is a fairly straightforward implementation of consistent hashing which relies on CouchDB's existing replication semantics for the "hard parts" of clustering (ie, anti-entropy).
In BigCouch a "database" is actually a collection of many "CouchDB databases". To make the distinction we refer to the underlying "CouchDB databases" as shards when talking about a clustered database.
A clustered database has two main parameters when its created, N - the number of copies of each shard range, and Q - the number of shard ranges. A clustered database will then have N * Q shards on disk. Its important to note that N is limited by the number of nodes in the cluster as there's not much sense in creating two copies on the same node. By default N is 3 and Q is usually 4 or 8. For databases that are expected to grow significantly a larger Q should be chosen. Changing Q is hard. Changing N is not so hard but should always be at least 3.
When a user makes a read or write request they provide what's called the quorum parameter. This is named either r
or w
depending on whether its a read or write. This value determines how many responses will be considered before returning to the client. Given N = 3, r
and w
can be the values 1, 2, or 3. The default for both r
and w
is 2 for normal document operations.
Views on the other hand are always the equvialent of an r=1
request as there's no direct method to resolve differences between all combinations of shards that produce any given view response.
There are four core applications added to facilitate the clustering layer:
- rexi - An RPC framework similar to rex
- mem3 - Maintains cluster state information
- fabric - The clustered API
- chttpd - An HTTP interface to fabric
The rexi
application is responsible for the RPC layer inside a BigCouch cluster. This application is responsible for two main tasks. First, it is the "target" of most RPC calls in that it performs the actual process spawn and reporting of errors. Second it implements a buffering in outgoing RPC requests.
RPC requests come in two flavors, a strict request/response approach to RPC as well as a request/streamed-response flavor. The request/response is used everywhere except for views, all docs, and changes feeds. The functions here are rexi:cast/2,3,4
and rexi:reply/1
.
The streaming API uses the rexi:cast/2,3,4
and remote workers respond using the rexi:stream2/1,2,3
and rexi:stream_last/1,2
calls. The stream functions allow for a configurable number of messages to be inflight. Streaming will pause when this limit is reached until it receives ACK's from the coordinator. Its a very basic form of TCP flow control.
Most of the interesting code for rexi
is in:
rexi.erl
- The main client interfacerexi_server.erl
- The server that spawns and tracks remote workersrexi_buffer.erl
- Buffers for RPC requestsrexi_utils.erl
- Specifically,rexi_utils:recv/6
for receiving RPC messages
The mem3
application is responsible for storing and managing "cluster state". This is where sharding and hashing are implemented. Its responsible for managing information on each clustered database as well as deciding which shards documents belong to.
mem3
uses two "node local" databases to store its information. These are by default nodes
and dbs
. The nodes
database stores a list of the nodes in the cluster along with a few bits of metadata for each node. Generally speaking this is a very small database.
The dbs
db stores all of the information on clustered databases. Each document represents a single database. To determine if a clustered database exists we check to see if there's an undeleted document in this databse (These documents are heavily cached in a parsed form I'll discuss shortly).
Both of the nodes
and dbs
databases are replicated through out the cluster in a ring based on node name ordering.
One of the other major repsonsibilities of mem3
is in its active anti-entropy via internal replication. As database shards are updated they are enqueued by mem3_sync
to wait on internal replication. As the name implies this is just an internal replicator that copies changes between shards. In normal operation each replication should be roughly a no-op except for some revision checks. Under high load shards can diverge due to errors and slow nodes not receiving writes. This is one of two methods for fixing the divergence issue.
The mem3_shards
module is one that most people will also be interested in. While the main API is handled in mem3.erl
this is the implementation for loading, parsing, and cacheing database information. This is stored as a series of #shard{}
records that are used extensively throughout the cluster layer.
Important modules in mem3
:
mem3.erl
- Main user APImem3_nodes.erl
- Manages the list of nodes in the databasemem3_rep.erl
- The internal replicatormem3_sync.erl
- The active anti-entropy modulemem3_shards.erl
- The clustered database cache
There's also a mem3_rebalance.erl
module included that has a bunch of logic for generating shard moves. Its important to note that it doesn't do any of the actual shard moves itself, its just generating the list of things to move. There's work going on to get the external tooling for these moves packaged into a distributable format.
The fabric
application is the cluster API application. Its a pure library application that has no permanent processes or supervision tree. The core API is contained in fabric.erl
which makes calls to the various other modules in the application.
In general, fabric.erl
will call fabrc_function_name:go/N
. Each of these "function modules" then will trigger a number of RPC messages and then handles the responses. The remote side of the RPC call is defined in the fabric_rpc.erl
module. To read this code its generally easiest to read through the remote side of the interaction to get an idea of what messages its responding with and then read the coordinator to see how those messages are handled.
The other form of anti-entropy is in Fabric's document read functions. This is a passive anti-entropy method called read-repair. People familiar with the Dynamo paper will have heard of this more standard anti-entropy method.
Basically, when we perform a quorum read and detect differences in the responses from each worker we will re-write the document back so that the nodes will be updated for the document in question.
There is a bit of a subtlety here in that we can use the revision history information to know which document is a "winner" and then return a response based on that knolwedge while performing the read-repair asynchronously.
The chttpd
application is a very direct clone of the various couch_httpd*
modules except that they've been updated to use fabric
instead of calling directly into other couch_*
modules.
b64url
- URL compatible Base64 encoding/decoding in a NIFcassim
- Security properties storageconfig
- INI file based configuration (couch_config.erl as an app)couch
- All of thecouch_*
modules from the oldsrc/couchdb
couch_event
- Event notifications (replaces couch_db_update_notifier)couch_log
- Thin front-end to lagerddoc_cache
- Design document cacheets_lru
- An LRU based on ETS. Another library applicationglobal_changes
- Clustered version of_db_updates
that supports resumekhash
- A hashmap NIF used bycouch_event
This is a pretty simple NIF that is just responsible for encoding and decuding Base46 URL values. The sequence format that BigCouch uses can get a bit heavy so we spent some time optimizing the encoding/decoding of these values.
The security properties for databases aren't governed by MVCC so the ability to do anti-entropy on these values is near impossible. There's some quorum based attempts in mem3_sync_security
that covers most of the easily correctable errors but its not perfect. Cassim exists to store security information for clustered databases in a clustered database. This information is then heavily cached for reference at runtime.
Currently the backing database for this app is called cassim
but we should probably change that to _security
and we'll want to add "before doc update" and "after doc read" functions.
This is just an extraction of the configuration code from CouchDB. For the other applications to use the config system they need to be able to depend on it so that it starts first. We had a lot of bugs with nodes coming up with a default config randomly because some applications started before couch was up and had read its config.
There are a few minor extensions to the API so that we can have type enforcement. There's also an additional "reason" parameter that gets logged when a config change is made. This can be useful for operations to note why something was changed.
This is a replacement of couch_db_update_notifier
. In a nutshell, it switches away from using a gen_event
behavior to a tree of hash maps and message passing. There's a number of reasons for this, but basically the gen_event
model is inappropriate when you can have an unbounded number of event handlers. Given that each changes feed adds a handler this leads to a significant bottleneck in the datbase node under high changes load.
Adam Kocoloski gave a talk somewhere with some good background on this particular change. I'll see if I can't find the slides and link them on the mailing list.
This is just a single file library application that wraps the lager
API. The intention is so that if people want to be able to switch the logging implementation they have a single place to do that.
So it turns out that some people like to put a significant amount of JavaScript into their design documents. We've had a few customers with design documents in the multi-megabyte range. Given that we need to load these design documents for lots of different requests it was a no brainer to add a cache here. Somewhere there's a very pretty picture of a cluster's internal traffic going from many meg's a second to a few kilobytes as this was flipped on.
The caching strategy here is a simple TTL based approach. We've added a new design document event message during the merge (rather, CouchDB had added it, ddoc_cache can now use it) which should allow us to make this caching strategy quite a bit better. We've also had a few discussions internally on how to implement the caching hot spots a bit better to reduce ets load. I'll try and distill all of that into a JIRA issue soon.
We had the "LRU on ETS" pattern in a few places so I've extracted it. We could probably stand to update a few of the CouchDB internals to use this in a few places as well. Views are a good example of where this would do some good.
Although ETS can be a bit of a bottleneck under load. I've contemplated rewriting this a number of times as a NIF which can turn this into a data structure with O(1) for get/push/pop operations. Haven't gotten to it though mainly due to couch_server not getting any benefit. Though if we change our ref counting strategy it might work there as well.
This application is the backing for the clustered _db_udpates
API. It was designed to mimic the current _db_updates
API as closely as possible. The main difference is that global_changes
supports the ability to resume notifications. The subtlety here is that you can't know how many updates of each type occurred. Ie, if you were to listen to _db_updates
for awhile and then disconnect, upon reconnecting you might learn that database X was updated, but there is no information as to how many times it has been updated. Its the client's responsibility to investigate the changes on each database directly.
This app has a very subtle reliance on MVCC that I should point out. The way that this works without breaking everything is that it relies on identical updates to produce identical revisions. If it weren't for that then there would either be a huge number of conflicts or there would have to be a central coordinator for writes. Its important to note that this reliance means we can't include a timestamp or count or any other information on the update.
The other fun thing that this relies on is how the changes feed only stores one entry per document. This way as we record updates we're only storing a single entry for every (dbname, type) pair. Thus the maximum size of this database (in number of documents) is currently 3 * $num_docs_in_dbs_db
because we record the three db events (created, deleted, updated).
This is a small NIF wrapper around the kazlib hash map. Currently its only used by couch_event
. I went with the NIF wrapper instead of ETS because couch_event needed to store an unbounded tree of mappings.
goldrush
- Alager
dependencylager
- Logging library for Erlangmeck
- Mocking for Erlang used in Eunit testsjiffy
- JSON decoder/encoder
A side note on ets_lru, the bottle neck is more so in the gen_server process rather than ETS itself. Once the merge is stable, I might take a stab at replacing it with something that handles load spikes better. There are some other drawbacks with ets_lru that I can discuss at length, perhaps on IRC.