Skip to content

Instantly share code, notes, and snippets.

@Overruler
Last active December 19, 2015 09:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Overruler/14c0f3810e870666a328 to your computer and use it in GitHub Desktop.
Save Overruler/14c0f3810e870666a328 to your computer and use it in GitHub Desktop.
Counters 2.0 in Cassandra vis-à-vis CRDTs.
The paper referred below is "A comprehensive study of Convergent and Commutative Replicated Data Types",
available here: http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf
https://issues.apache.org/jira/browse/CASSANDRA-4775?focusedCommentId=13613693&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13613693
says that “The current counter implementation is based on the idea of internally keeping one separated
sub-counter (or "shard") for each replica of the counter, and making sure that for each increment, one
shard and only one is ever incremented.” So far, this matches the counter specified in the paper, but I
think there’s several big differences as well.
Here’s how I understand the state-based increment/decrement counter from the paper to work
with 3 replicas r1,r2,r3:
r1 has {r1=[4,-4],r2=[3,0],r3=[5,-1]} and would say the counter is 7
r2 has {r1=[4,-3],r2=[4,0],r3=[5,-1]} and would say the counter is 9
r3 has {r1=[4,-4],r2=[3,0],r3=[5,-1]} and would say the counter is 7
Client A tells r1 to increment by 100:
r1 now has {r1=[104,-4],r2=[3,0],r3=[5,-1]} which it sends to r2 and r3.
r2 may now have {r1=[104,-4],r2=[4,0],r3=[5,-1]} and say the counter is 108
r3 may now have {r1=[104,-4],r2=[3,0],r3=[5,-1]} and say the counter is 107
Client B tells r2 to decrement by 10:
r2 now has {r1=[4,-3],r2=[4,-10],r3=[5,-1]} which it sends to r1 and r3.
r1 may now have {r1=[4,-4],r2=[4,-10],r3=[5,-1]} and say the counter is -2
r3 may now have {r1=[4,-4],r2=[4,-10],r3=[5,-1]} and say the counter is -2
Client C reading the counter from r3 while A and B execute concurrently could see:
-1) r3 says it has {r1=[4,-4],r2=[3,0],r3=[5,-1]} and the counter is 7
-2a) r3 says it has {r1=[104,-4],r2=[3,0],r3=[5,-1]} and the counter is 107
-2b) r3 says it has {r1=[4,-4],r2=[4,-10],r3=[5,-1]} and the counter is -2
-3) r3 says it has {r1=[104,-4],r2=[4,-10],r3=[5,-1]} and the counter is 98
Client A can retry the increment by 100 by reading the payload {r1=[104,-4],r2=[3,0],r3=[5,-1]}
from r1 and then sending that payload directly to r2 and r3 until it succeeds.
Back to Cassandra, if I understand correctly shards are the numbers 4, 3, 5 in an increment only
counter {r1=[4],r2=[3],r3=[5]}. First big difference is that to avoid reading the value of the owned
shard Cassandra’s replicas sometimes (always? eventually?) create new shards instead of incrementing
the one they already have. Example:
Client A tells r1 to increment by 100: r1 has {r1=[4 and 100],r2=[3],r3=[5]} instead of
{r1=[104],r2=[3],r3=[5]}
Instead of a single shard r1 has a set of shards and Client A effectively told r1 to add a new shard
with the initial value 100 to its set of shards. Retrying this operation would be counter-productive
just on principle because the goal was to increment an existing shard in the first place.
Another difference is that shards include timestamps which are used when merging shards instead of
merging based on the value in the shard. Yet another difference is that shards are merged differently
depending on which replica is merging which shards. Continuing the last example:
r1 has {r1=[4 and 100],r2=[3],r3=[5]} which it merges into {r1=[104],r2=[3],r3=[5]} and r1 would say the
counter is 112. If due to a topology change r2 ends up with r1's {r1=[4 and 100],r2=[3],r3=[5]}, r2 merges
that into {r1=[100],r2=[3],r3=[5]} and r2 would say the counter is 108. Both of these cases are problematic.
r1 increments one of the shards but essentially decrements the other to zero when the owner is only
supposed to increment. r2 essentially decrements to zero a shard it doesn’t own.
Whew! Here’s how I think this could be changed to work like a State-based PN Counter (combines two counters,
Positive and Negative, one shard containing both values) unless I’ve horrifically misunderstood something:
1) The buffering and bulk processing of incoming requests isn’t specific to counters so the unprocessed
increments don’t need to be stored in the counter, instead they should be inserted into a work queue
that's separate from the counter and only belongs to the replica. This way unprocessed increments won’t
get propagated to other replicas. When the replica needs to calculate the value of the counter, it processes
the work queue and increments its shards as normal with all the locking and writing.
2) To propagate the incremented counter the replica can create a list of all (or some of) the shards that
are replicating the counter and insert an exact duplicate shard for each of them to receive. The new
shards are transmitted to other replicas like before.
3) Resolving two shards that are owned by the same replica is changed to happen the same way on every replica.
To ensure a shard is never decremented the node always keeps the shard with the highest absolute value and
ignores timestamps. Last-Write-Wins must not be allowed to affect the convergence of the values in shards.
I’m not sure how big of a threat this is in practice.
With this some implementation issues still remain: changing the owner of shards to another replica, decommisioning
a node, garbage collecting a shard, deleting a counter, client retryable operations:
- To change the owner of a shard from replica r3 which has {r1=[104,-4],r2=[4,-10],r3=[5,-1]} to replica r4,
a coordinator node sets a flag marking the old shard in r3 garbage, r3 can but should not receive new clients
after this. It then sets up a new replica on r4 and moves the difference of r3's negative and positive values
there: r3 is told to decrement by 4 and r4 is told to increment by 4, with the end result being:
r3 has {r1=[104,-4],r2=[4,-10],r3=[5,-5]&garbage} and r4 has {r1=[0,0],r2=[0,0],r3=[0,0],r4=[4,0]}
These converge to {r1=[104,-4],r2=[4,-10],r3=[5,-5]&garbage,r4=[4,0]} eventually.
- To decommission a node, a coordinator node assumes ownership of the shard owned by the old replica and
moves its value to an existing replica, proceeding otherwise like above when changing ownership.
- To garbage-collect a shard that it has previously marked garbage, a coordinator node again moves the
difference of the negative and positive values to another replica. After that the old shard and its copies
can be removed.
- To delete a counter, a coordinator node marks all shards garbage and transfers differences from all replicas
to itself. It can then resurrect the counter with a new replica if afterwards at garbage collection time
the counter doesn't add up to zero.
- To give clients the ability to retry an operation, clients need to be able to confirm that
a tuple {replicaId,clientId,oldCounterValue,newCounterValue} has been added to the replica's work queue.
The operation to add this tuple is idempotent if the work queue is implemented as a grow-only set and so
it can be retried until confirmed. After receiving confirmation the client reads the value of the counter
to receive an updated value with the tuple applied, this operation is idempotent as well. Once that has
completed successfully the client can ask the replica to propagate the updated value until quorum, this
also makes the earlier tuple garbage.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment