Skip to content

Instantly share code, notes, and snippets.

@quinthar
Last active December 14, 2020 19:09
Show Gist options
  • Save quinthar/31d8e58aac8432426cadd2f71496d984 to your computer and use it in GitHub Desktop.
Save quinthar/31d8e58aac8432426cadd2f71496d984 to your computer and use it in GitHub Desktop.
Explanation of how a cluster might do multi-leader replicated transactions using vector clocks

Explanation of how a cluster might do multi-leader replicated transactions using vector clocks

BedrockDB is a highly distributed, WAN-replicated, fault-tolerant database: read BedrockDB.com for more. Of particular relevance to this Gist, the design is such that the client (eg, webserver) sends every "command" (ie, to trigger a stored procedure written in C++ on the Bedrock node) to any node in the cluster -- the client needn't treat any node special. However, in practice one of the nodes is very special: it is the "leader", which coordinates distributed two-phase commit transactions.

Problem:

In practice the extra burden upon the leader isn't normally material: most commands that generate write activity generate roughtly the same number of "extra" queries on the leader and follower -- after all, anything that changes the database needs to be replicated out to followers using the same queries. However, there are some write commands that do a tremendous amount of "extra reads" (or computational processing) that can disproportionally burden the leader. One example of that kind of "read heavy write command" is billing: it involves querying and analyzing a ton of data, to insert a single row into the billing table. This was bad enough that we hacked around it to run the "read" portion on the followers -- and do some other gymnastics to spread the load between the followers -- such that only the trivial write gets escalated at the end. This "works", but is a bit of a hack and not really generalizable.

A second problem revolves around administering the system in the real world. In theory the extra load upon the leader should be negligible, and you shouldn't need to care about which node is the leader. In practice the fact that every write command is escalated to one node means that you need to be very attuned to the actual production configuration at all times -- and make sure that the leader has the lowest latency to the other other nodes. There are a wide variety of practical problems that come up as a result of the inherently unbalanced "single leader" design. These range from the simple ones like:

"The leader logs a lot more data than the others, so it's the first to suffer problems when the log system gets backed up"

To very complicated ones like:

"Node A can talk to B and B to C, but the network has broken in a fashion such that A can't talk to C. A is the higest priority, and C is second. Both A and C think it's their job to become LEADER, so both attempt to stand up. However, when C attempts, it is told by B there's already a leader, so it should stand down. C can't find the leader, so attempts to stand up again and again, forever -- in the process "black holing" any commands sent to it from the webserver. (And expand this for a 6-server cluster with 1 permafollower, across a WAN VPN, and introduce long timeouts, flaky connections, etc.)

In practice, there is a high "baseline hassle" to having an unbalanced design that we have just come to accept as the cost of doing business. I think a different design that is naturally balanced might have a bit more complexity in the code (that we can test and fix in a dev environment), but a lot less complexity in the real world (which is much, much harder to control -- and much more expensive when things fail).

Solution:

This Gist outlines a possible path forward to upgrading Bedrock to be truly multi-leader, such that every node is truly equal (every node "leads" its own transactions, and "follows" the transactions of others). The key is a "vector timestamp", which just means "the local commit count of this transaction on this node". 

Example 1: A single transaction

So imagine there were 5 nodes: A-E.  And imagine all of them have empty databases with zero commits.  Further imagine A sent out the first transaction, and everyone else was idle.  Additionally, imagine there was a PREPARE command (described in detail at the end), which confirmed that the current transaction doesn't conflict with any other PREPARED or COMMITTED transactions. The messages would be like this for a very trivial example of a single transaction in an otherwise idle cluster:

  1. A would BEGIN CONCURRENT a SQL transaction like normal, and when done, run PREPARE on it then send A0: BEGIN Alice to B-E, which means:

    "Hi, I'm node A, and have obtained quorum for 0 transactions.  I've just successfully prepared my first transaction for us all to commit, which I will tentatively name Alice. Please prepare Alice for commit!"

  2. B would PREPARE Alice, and upon success, respond back to A with B0: APPROVED Alice, which means:

    "Hi A, I'm B, and I have also obtained quorum for 0 transactions.  I was able to successfully prepare your Alice transaction, meaning so far as I know, it doesn't conflict with any transaction I've ever committed, or that I have prepared but not yet committed. (It might conflict with other transactions I'm doing that haven't yet been prepared, but I can't know that yet because they're not done -- and if they do conflict, they're SoL, you're transaction comes first.)"

  3. C-E would do the same thing as B in (2), because in this simple example, the whole cluster is idle except for the single Alice transaction being led by A.

  4. Once A receives approval from 2 other nodes -- without even waiting for the others -- it has quorum.  So let's assume in this example, all of B-E approve of the transaction, but C and E reply first. A would then broadcast A1: COMMIT Alice[1, *, 0, *, 0] to the rest of the cluster, which means:

    "Alright everyone, I'm A, and I've obtained quorum for 1 transaction.  Additionally, I hereby command you to commit the Alice transaction you've previously said you've successfully prepared, which I am assigning the globally unique vector timestamp of [1, *, 0, *, 0]. Breaking it down, the name means:

    • Alice - It was led by node A
    • [1, - It was the first transaction led by A
    • *, - It did not receive approval from B
    • 0, - It was approved by C when C had obtained quorum to commit 0 transactions
    • *, - It did not receive approval from D
    • 0] - It was approved by E when E had obtained quorum to commit 0 transactions
  5. When A receives the approval from B and D, it just ignores them -- the transaction has already been approved by quorum and committed, so it doesn't matter what they think anymore.

  6. When B-E receive the COMMIT command, they commit the previously prepared transaction, and add it to their local journal (ie, a list of every transaction they've ever committed -- see private blockchain for more) for future refernece.

It's easy how to imagine this trivial example handles individual node failures:

  • Nodes that hang, disconnect, or otherwise become unresponsive don't hold up the boat
  • Nodes that have some local database problem not shared by the rest of the cluster -- out of disk, corrupt sector, etc -- don't hold up the boat

How a vector-time journal works

Critical to a clustered system is enabling new nodes to "catch up" by replaying transactions in the "correct" order. In a single-leader cluster, this is really easy to imagine: just commit every transaction in the same order it was committed in the leader. This is how Bedrock's journal currently works: it assigns every commit a single scalar id, which increments monotonically. Bedrock currently has a single "timeline" that all nodes are expected to follow; anyone who disagrees with that timeline is kicked out. So "sorting" the commits is real simple:

bool scalarLessThan(lhs, rhs) {
    return lhs < rhs;
}

But vector timestamps blow all that up because there isn't just one timeline. They're more like a multiverse where there are multiple highly similar views of reality that all eventually end in the same place, but take negligibly different paths to get there. The butterfly might go left in one timeline, and right in another, but it always lands in the same place eventually. Accordingly, when a node comes back into the cluster, it has a trickier time deciding the order of events it wants to apply in its timeline. One solution is to just copy the order of some other timeline, but that's not actually necessary: there are many orders that could work. So long as the order picked doens't result in any conflicts, it doesn't really matter.

The purpose of a vector clock is to identify when order matters, and when it doesn't. So if you view each node as having a different timeline, the only order that matters is the order as experienced in each timeline. In the "fully qualified" case of two vector timestamps that have every component defined, this could be compared as follows:

bool lessThan(lhs, rhs) {
    return lhs[0]<rhs[0] && 
           lhs[1]<rhs[1] &&
           lhs[2]<rhs[2] &&
           lhs[3]<rhs[3] &&
           lhs[4]<rhs[4];
}

Note that every component of the vector needs to be less for it to compare. So that means:

  • [1,0,0,0,0] IS less than [2,0,0,0,0]. This means you need to commit the LHS transaction before the RHS transaction.
  • [1,0,0,0,0] IS NOT less than [0,0,0,0,2]. This means you do not need to commit the LHS before the RHS.
  • But this is where it gets mind bending: [0,0,0,0,2] isn't less than [1,0,0,0,0] either. Which means that you can commit LHS and RHS in either order! :mindblown:

Now, in practice you don't actually need to sort the whole journal: what you really want is "which commit should I apply next". And given that time "advances" only when a node initiates and commits a transaction (and even if you are running multiple transactions in parallel, in practice the "commits" are serialized on each node), this means every "tick" of the clock equates to a single transaction being commit on a single node.

So if you assume every node starts with T=0 in its timeline, a new node bootstrapping into the cluster has a vector timestamp of [0,0,0,0,0], and you learn of the following transactions:

  • A1 = [1,0,0,0,0]
  • A2 = [2,0,0,0,0]
  • B1 = [2,1,0,0,0]
  • B2 = [2,2,0,0,0]
  • C1 = [0,0,1,0,0]
  • D1 = [2,1,0,1,0]
  • E2 = [0,0,0,0,2]

There are a number of valid orders:

  • A1, A2, B1, B2, C1, D1
  • A1, C1, A2, B1, D1, B1

And so on. But you can never apply E2 because there is no E1 -- it would require jumping forward "two ticks" in the E timeline to apply that, and time needs to advance at most one tick per timeline in a given transaction.

(One could imagine some commit A3B3 = [3,3,0,0,0] that could be commit after A1,A2,B1,B2 which would advance the A and B timelines simultaneously, but I'm not sure when that would happen in practice... maybe some kind of timeline merging..?)

And a final note to make things even more trippy, recall how in the above trivial example, we don't actually wait for all nodes to respond before "calling it" -- once we have quorum, we charge forward. This means that in practice, vector timestamps are not "fully qualified": we should only compare the components of the timestamp that are in fact qualfied. So if your local database had a vector timestamp of [1,2,3,4,5] then you can safely apply [2,2,3,*,*] because it only advances the A by one tick, and matches the timelines of B-C, but leaves D-E unspecified.

Now this is the million dollar question: why does that work? What system can create vector-timestampped commits in this way that are mathematically guaranteed to not conflict? That's where the magic of quorum comes in!

Why quorum-approved, vector-timestampped commits cannot conflict

I got this idea while reading Ball Lightning by Liu Cixin which talked about how the exact location of a particle is indeterminate until "observed". Everything about quantum mechanics sounds utterly insane, but the idea of an "observer" has always seemed the most absurd of all to me as it seemed to codify an anthropocentric viewpoint to physics -- which I just find implausible. But the book suggested to me that an "observer" is not human consciousness, but rather any particle whose state is strongly influenced by the location of that particle. Said another way, the universe doesn't assign a location to any given particle until it "matters" -- and the more particle states that are ultimately influenced by the exact position of a partical, the stronger the "observer effect", and the faster the probabilistic waveform "collapses" to a determined location in space. Or said yet another way: particle locations are determined by a consensus of other particles. Or one more way: reality is defined by quorum agreement.

Now, I have no idea if any of that is even remotely true. But it also doesn't really matter. What does matter is that reality so far as the cluster is concerned is that which over half of the cluster agrees upon -- and they are coming to that agreement collaboratively by only "approving" commits that have been validated as non-conflicting with all other commits being evaluated by the cluster at that moment. And so far as I can determine, it's not possible for the cluster to simultaneously (or really, in overlapping time windows) obtain quorum approval of two conflicting transaction.

Furthermore, I think this is true even if different quorums participate. For example, I think this is true even if quorum A,B,C approves one transaction while C,D,E approves another because the nature of quorum means every quorum group will always overlap -- in this case with C, who will evaluate both commits simultaneously and confirm that they are non-conflicting.

I'd love help confirming this. I've been thinking hard on this and can't come up with a scenario where it fails. But it's also pretty headache-inducing stuff, so it'd be awesome if someone who is much better at mathematical proofs could either confirm or deny this. Until then, I'm just taking it on a leap of faith.

Example 2: Simultaneous non-conflicting transactions from different nodes

To illustrate all of the above, let's consider the same 5-node cluster (A-E), all of which start with empty databases (eg, [0,0,0,0,0]). A and B start two non-conflicting transactions at the exact same moment -- down to the nanosecond (none of this system depends on wall clock time in any way). This is how it'd work:

  1. A0: BEGIN Alice="CREATE TABLE foo (bar);"
  2. B0: BEGIN Bob="CREATE TABLE hello (world);"
  3. A0: APPROVE Bob (repeated by C first, then D and E later)
  4. B0: APPROVE Alice (repeated by D first, then C and E later)
  5. A1: COMMIT Alice[1,0,*,0,*]
  6. B1: COMMIT Bob[0,1,0,*,*]

Example 3: Simultaneous non-conflicting transactions from the same node

If we assume each node is multi-threaded, then it will actually have multiple write transactions going on simultaneously. They are serialized within each node upon "commit" to create a canonical order insofar as each individual node is concerned about its own commits, but at any point in time a single node might be "negotiating" multiple distributed transactions (ie, seeking quorum) for multiple transactions. This system handles approvals for transactions originating from the same node to be approved out of order (and is why transactions are given temporary, non-monotonical names up until final commit).

For this, let's extend Example 2. All five databases have two tables, foo and hello. The local state of all their databases is [1,1,0,0,0], meaning they have each committed one transaction from A and B, and none from the rest.

  1. A1: BEGIN Alex="INSERT INTO foo VALUES ('Alex');"
  2. A1: BEGIN Andre="INSERT INTO hello VALUES ('Andre');"
  3. B1: APPROVE Andre (repeated by C first, then D and E)
  4. B1: APPROVE Alex (repeated by C first, then D and E)
  5. A2: COMMIT Andre[2,1,0,*,*]
  6. A3: COMMIT Alex[3,1,0,*,*]

In this case, A's local multi-threaded conflict detection has confirmed that Alex and Andre do not conflict so far as A is concerned -- it's fine committing them in either order. However, it records the order that it did commit them such that when others are replaying the transactions, they can replay them in the same order. This way the replication code doesn't need to redo conflict detection: it knows for certain that transactions can be applied safely in a given order.

Example 4: Simultaneous conflicting transactions from different nodes

Now for the main event. Let's extend the Example 3: five databases all at [3,1,0,0,0], each with a foo and hello table containing Alex and Andre respectively. Let's try this:

  1. A3: BEGIN Andrew="UPDATE foo SET bar='Andrew' WHERE bar='Alex';"
  2. B1: BEGIN Betty="UPDATE foo SET bar='Betty' WHERE bar='Alex';"

So in this case, two conflicting transactions have been begun by two different nodes, at the same exact time. And let's assume each of those gets some approval from the "local node" in the same datacenter (ie, C is in the same datacenter as A so the message gets there faster; the same for B and D):

  1. C0: APPROVE Andrew
  2. D0: APPROVE Betty

Soon the conflicting message gets to the other datacenter, and when they try to PREPARE the transaction, a conflict is discovered and the transaction is rejected. In this case it notes what specific pecific transaction it conflicts with (the importance of which is explained later) and shares it (which isn't strictly necessary but helpful for debugging):

  1. A0: REJECT Betty FOR Andrew
  2. B1: REJECT Andrew FOR Betty
  3. C0: REJECT Betty FOR Andrew
  4. D0: REJECT Andrew FOR Betty

So far it's neck and neck:

  • Andrew has two approvals (from A implicitly, and from C explicitly), and two rejections (from B and D)
  • Betty has two approvals (from B implicitly, and from D explicitly), and two rejections (from A, and C)

But it all comes down to which can get the message to E first to be a tiebreaker; let's assume A wins:

  1. E0: APPROVE Andrew
  2. E0: REJECT Betty FOR Andrew

At this point, A receives its third approval, commits it locally, and and broadcasts that it achieved quorum between A4 (not A3, because A just committed), C0, and E0):

  1. A4: COMMIT Andrew=[4,*,0,*,0]

And around the same time, B receives its third rejection, which causes it to rollback locally, and broadcast the same to whomever approved it -- in this case, D):

  1. B1: ROLLBACK Betty

And then eventually both B and D -- which previously rejected Andrew -- receive the command to commit it. This is where things get tricky (as if everything else up to this was straightforward). Now, if the COMMIT Andrew command comes after the ROLLBACK Betty, then it will probably just be able to commit directly: whatever conflict Andrew had with Betty is gone, because Betty has been rolled back.

But let's say the COMMIT Andrew comes before ROLLBACK Betty -- what then? Well, I think whenever a node gets a COMMIT it should know that eventually, it'll work: the rest of the cluster agreed it works there, and every node is ultimately doing the same commits, so if it's not working here it's because there is some prepared transaction creating a conflict. Accordingly, when a node gets a commit it can't apply immediately, it should just retry applying it after the transaction it conflicted with rolls back. Eventually, whatever caused it to fail the first time will eventually be rolled back, allowing this to move forward.

How exactly would this PREPARE function work, anwyay?

Chatting with the fine folks at SQLite, they suggest two API calls be added:

int sqlite3_prepare_commit(sqlite3 *db);

sqlite3_prepare_commit() must be called on a connection handle with an open BEGIN CONCURRENT transaction. It tests whether or not the transaction conflicts with any transaction that has already been committed to the database. In other words, whether or not an attempt to COMMIT the transaction at this time would fail with SQLITE_BUSY_SNAPSHOT.

int sqlite3_test_conflict(sqlite3 *db1, sqlite3 *db2);

Both database connections passed to sqlite3_test_conflict() must have open BEGIN CONCURRENT transactions. This function tests whether or not it would be possible to commit the transaction opened by db2 after the db1 transaction. In other words, whether or not the db2 transaction reads any database page written by the [db1] transaction.

Given this, in step 5 above, when B receives A3: BEGIN Andrew="UPDATE foo SET bar='Andrew' WHERE bar='Alex';" it would:

a. BEGIN CONCURRENT - Start a new transaction on the local database b. UPDATE foo SET bar='Andrew' WHERE bar='Alex'; - Execute the query in a transaction c. sqlite3_prepare_commit(db) - Test to see if this conflicts with any existing committed tranaction, which it doesn't d. sqlite3_test_conflict(db1, db1) - Loop across all outstanding "prepared" but uncommitted transactions; this would fail when it compares against UPDATE foo SET bar='Betty' WHERE bar='Alex';

This causes B to send B1: REJECT Andrew FOR Betty back to A. However B doesn't actually rollback the prepared transaction automatically: it waits for final word from A. And in this case, much to B's surprise, A says:

Hey B, I know you said you couldn't commit Andrew, but it turns out everyone else can. So, just a head's up, that Betty transaction you claim conflicts with Andrew is going to fail to get quorum; when you confirm that for yourself and roll it back, please go ahead and commit Andre.

Conclusion

Though this Gist is a bit complex to explain, this is really just explaining the complex interaction between some very simple rules. We would need to really prove that these rules in fact work -- but if they do, the actual code itself should be pretty straightforward.

@haikuginger
Copy link

haikuginger commented Dec 13, 2020

I really like this concept of vector timestamps a lot! I think expanding on a property you mention in passing gives you something much closer to total transaction ordering. You sort of throw this in as a neat side effect that allows you to avoid conflicts, but it's crucial in taking the approach even further:

Furthermore, I think this is true even if different quorums participate. For example, I think this is true even if quorum A,B,C approves one transaction while C,D,E approves another because the nature of quorum means every quorum group will always overlap -- in this case with C, who will evaluate both commits simultaneously and confirm that they are non-conflicting.

A neat side effect of the fact that we wait for a majority quorum to come to consensus is that every quorum will share at least one member with every previous quorum—any three items chosen from five items will share at least one item with any other three items chosen from those same five items. (We'll set aside for the moment that servers break down and are replaced and the system is grown and shrunk and so on.)

We can use that property further than you suggest. As written, each potential quorum member transmits its own scalar time to the potential committer. However, if each quorum member instead transmits what it understands to be the present vector time, we can do something interesting:

A0: BEGIN Alice
B0: APPROVE Alice [0, 0, 0, 0, 0]
D0: APPROVE Alice [0, 0, 0, 0, 0]
A1: COMMIT Alice [1, 0, 0, 0, 0]
C0: BEGIN Camilla
E0: APPROVE Camilla [0, 0, 0, 0, 0]
B0: APPROVE Camilla [1, 0, 0, 0, 0]
C1: COMMIT Camilla [1, 0, 1, 0, 0]
D0: BEGIN Diana
A1: APPROVE Diana [1, 0, 0, 0, 0]
E0: APPROVE Diana [1, 0, 1, 0, 0]
D1: COMMIT Diana [1, 0, 1, 1, 0]

And so on. By having quorum members retransmit what they each perceive to be the current vector timestamp based on the committed transactions they've heard of (each using the maximal scalar timestamp it's heard of for each other cluster member), and having the committer use the maximal received value for each vector component before incrementing its own scalar timestamp, you achieve near-total ordering of transactions.

Interestingly, all this has the side effect that the vector timestamp for a given transaction doesn't necessarily propagate everywhere until after a majority of the system has actually processed the COMMIT command containing the new vector timestamp, with the quorum holding the transaction pending and refusing conflicting transactions until then.

Anyway, the lack of total ordering isn't necessarily an issue in your original proposed protocol, but I could see it leading to issues where a semi-split network exists—e.g., A and B can talk to C, and D and E can talk to C, but A and B can't talk to D and E. A situation like that would lead to events that don't necessarily conflict with each other, but which also can't be ordered relative to each other without just relying on C to act as the arbiter of what came first; if C were to go down around when the network reconverges, you'd lose the information necessary to order the transactions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment