-
-
Save Overruler/14c0f3810e870666a328 to your computer and use it in GitHub Desktop.
Counters 2.0 in Cassandra vis-à-vis CRDTs.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The paper referred below is "A comprehensive study of Convergent and Commutative Replicated Data Types", | |
available here: http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf | |
https://issues.apache.org/jira/browse/CASSANDRA-4775?focusedCommentId=13613693&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13613693 | |
says that “The current counter implementation is based on the idea of internally keeping one separated | |
sub-counter (or "shard") for each replica of the counter, and making sure that for each increment, one | |
shard and only one is ever incremented.” So far, this matches the counter specified in the paper, but I | |
think there’s several big differences as well. | |
Here’s how I understand the state-based increment/decrement counter from the paper to work | |
with 3 replicas r1,r2,r3: | |
r1 has {r1=[4,-4],r2=[3,0],r3=[5,-1]} and would say the counter is 7 | |
r2 has {r1=[4,-3],r2=[4,0],r3=[5,-1]} and would say the counter is 9 | |
r3 has {r1=[4,-4],r2=[3,0],r3=[5,-1]} and would say the counter is 7 | |
Client A tells r1 to increment by 100: | |
r1 now has {r1=[104,-4],r2=[3,0],r3=[5,-1]} which it sends to r2 and r3. | |
r2 may now have {r1=[104,-4],r2=[4,0],r3=[5,-1]} and say the counter is 108 | |
r3 may now have {r1=[104,-4],r2=[3,0],r3=[5,-1]} and say the counter is 107 | |
Client B tells r2 to decrement by 10: | |
r2 now has {r1=[4,-3],r2=[4,-10],r3=[5,-1]} which it sends to r1 and r3. | |
r1 may now have {r1=[4,-4],r2=[4,-10],r3=[5,-1]} and say the counter is -2 | |
r3 may now have {r1=[4,-4],r2=[4,-10],r3=[5,-1]} and say the counter is -2 | |
Client C reading the counter from r3 while A and B execute concurrently could see: | |
-1) r3 says it has {r1=[4,-4],r2=[3,0],r3=[5,-1]} and the counter is 7 | |
-2a) r3 says it has {r1=[104,-4],r2=[3,0],r3=[5,-1]} and the counter is 107 | |
-2b) r3 says it has {r1=[4,-4],r2=[4,-10],r3=[5,-1]} and the counter is -2 | |
-3) r3 says it has {r1=[104,-4],r2=[4,-10],r3=[5,-1]} and the counter is 98 | |
Client A can retry the increment by 100 by reading the payload {r1=[104,-4],r2=[3,0],r3=[5,-1]} | |
from r1 and then sending that payload directly to r2 and r3 until it succeeds. | |
Back to Cassandra, if I understand correctly shards are the numbers 4, 3, 5 in an increment only | |
counter {r1=[4],r2=[3],r3=[5]}. First big difference is that to avoid reading the value of the owned | |
shard Cassandra’s replicas sometimes (always? eventually?) create new shards instead of incrementing | |
the one they already have. Example: | |
Client A tells r1 to increment by 100: r1 has {r1=[4 and 100],r2=[3],r3=[5]} instead of | |
{r1=[104],r2=[3],r3=[5]} | |
Instead of a single shard r1 has a set of shards and Client A effectively told r1 to add a new shard | |
with the initial value 100 to its set of shards. Retrying this operation would be counter-productive | |
just on principle because the goal was to increment an existing shard in the first place. | |
Another difference is that shards include timestamps which are used when merging shards instead of | |
merging based on the value in the shard. Yet another difference is that shards are merged differently | |
depending on which replica is merging which shards. Continuing the last example: | |
r1 has {r1=[4 and 100],r2=[3],r3=[5]} which it merges into {r1=[104],r2=[3],r3=[5]} and r1 would say the | |
counter is 112. If due to a topology change r2 ends up with r1's {r1=[4 and 100],r2=[3],r3=[5]}, r2 merges | |
that into {r1=[100],r2=[3],r3=[5]} and r2 would say the counter is 108. Both of these cases are problematic. | |
r1 increments one of the shards but essentially decrements the other to zero when the owner is only | |
supposed to increment. r2 essentially decrements to zero a shard it doesn’t own. | |
Whew! Here’s how I think this could be changed to work like a State-based PN Counter (combines two counters, | |
Positive and Negative, one shard containing both values) unless I’ve horrifically misunderstood something: | |
1) The buffering and bulk processing of incoming requests isn’t specific to counters so the unprocessed | |
increments don’t need to be stored in the counter, instead they should be inserted into a work queue | |
that's separate from the counter and only belongs to the replica. This way unprocessed increments won’t | |
get propagated to other replicas. When the replica needs to calculate the value of the counter, it processes | |
the work queue and increments its shards as normal with all the locking and writing. | |
2) To propagate the incremented counter the replica can create a list of all (or some of) the shards that | |
are replicating the counter and insert an exact duplicate shard for each of them to receive. The new | |
shards are transmitted to other replicas like before. | |
3) Resolving two shards that are owned by the same replica is changed to happen the same way on every replica. | |
To ensure a shard is never decremented the node always keeps the shard with the highest absolute value and | |
ignores timestamps. Last-Write-Wins must not be allowed to affect the convergence of the values in shards. | |
I’m not sure how big of a threat this is in practice. | |
With this some implementation issues still remain: changing the owner of shards to another replica, decommisioning | |
a node, garbage collecting a shard, deleting a counter, client retryable operations: | |
- To change the owner of a shard from replica r3 which has {r1=[104,-4],r2=[4,-10],r3=[5,-1]} to replica r4, | |
a coordinator node sets a flag marking the old shard in r3 garbage, r3 can but should not receive new clients | |
after this. It then sets up a new replica on r4 and moves the difference of r3's negative and positive values | |
there: r3 is told to decrement by 4 and r4 is told to increment by 4, with the end result being: | |
r3 has {r1=[104,-4],r2=[4,-10],r3=[5,-5]&garbage} and r4 has {r1=[0,0],r2=[0,0],r3=[0,0],r4=[4,0]} | |
These converge to {r1=[104,-4],r2=[4,-10],r3=[5,-5]&garbage,r4=[4,0]} eventually. | |
- To decommission a node, a coordinator node assumes ownership of the shard owned by the old replica and | |
moves its value to an existing replica, proceeding otherwise like above when changing ownership. | |
- To garbage-collect a shard that it has previously marked garbage, a coordinator node again moves the | |
difference of the negative and positive values to another replica. After that the old shard and its copies | |
can be removed. | |
- To delete a counter, a coordinator node marks all shards garbage and transfers differences from all replicas | |
to itself. It can then resurrect the counter with a new replica if afterwards at garbage collection time | |
the counter doesn't add up to zero. | |
- To give clients the ability to retry an operation, clients need to be able to confirm that | |
a tuple {replicaId,clientId,oldCounterValue,newCounterValue} has been added to the replica's work queue. | |
The operation to add this tuple is idempotent if the work queue is implemented as a grow-only set and so | |
it can be retried until confirmed. After receiving confirmation the client reads the value of the counter | |
to receive an updated value with the tuple applied, this operation is idempotent as well. Once that has | |
completed successfully the client can ask the replica to propagate the updated value until quorum, this | |
also makes the earlier tuple garbage. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment