Skip to content

Instantly share code, notes, and snippets.

@jrwest

jrwest/ecweq1.md Secret

Last active October 19, 2023 15:54
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jrwest/f8c0d49174f4db1c4c88 to your computer and use it in GitHub Desktop.
Save jrwest/f8c0d49174f4db1c4c88 to your computer and use it in GitHub Desktop.

The following fell out of a discussion I had w/ Engel Sanchez and Andrew Thompson. I'm going to attempt to discuss this case, generally, but the discussion was initially focused on Riak's new Cluster Metadata subsystem and Riak itself. So I'm sure some assumptions and details about the two will leak through.

Background

Assume we have a static set of nodes, n1,n2,..,nN, each of which are responsible for storing a replica, r1,r2,...,rN, of the value for key, k. Each replica is a list of values (siblings) and a logical clock.In the case of Riak, the set of nodes may be the owners in the preference list (primary or otherwise but for now we consider static members). In Cluster Metadata, the set of nodes is equivalent to all nodes in the cluster.

When a update to k is received by a node, it increments its counter in the logical clock and writes it along w/ the new value, locally, before returning to the client. All further replication is done asynchronously. This is the only option in Cluster Metadata and w=1 in Riak.

Siblings may be generated when concurrent updates are received by different nodes or when updates two updates occur on different sides of a network partition.

When reading k I'm going to assume for now that we only wait for a value from a single replica -- r=1 in Riak land and the way Cluster Metadata works. I suspect this applies no matter how many replicas we read but I haven't had time to think it through. Either way, the siblings are resolved and the new value is written back to the cluster. This write may or may not occur on the same node as the previous read and is treated like any other update. Specifically the logical clock is incremented -- the new value is a descendant of the siblings that were resolved.

Initial State

In addition to the assumptions above, we start with each replica r1,r2,..,rN having the same value. The value has S siblings, [s1,s2,..,sS] and logical clock [c1,c2,...,cN] -- one counter per node like a version vector and that the identifier is per node in the cluster not per client (Riak's vnode_vclocks). The process of getting in this state is not a concern and its a separate conversation about how difficult it is to get there. What's important for this example is that we have a siblings on a subset of nodes that, to start, all have the same logical clock. To keep this simple, among other reasons, I'll assume the same siblings are on all nodes to start and that all nodes have these siblings. Neither of these assumptions are requirements.

Concurrent Resolution

The guts of the problem stems from what happens when we read these siblings concurrently. As outlined earlier, when we resolve and produce a new value we perform an update. This can lead to the further generation of siblings.

Take the following progression, assuming 3 nodes, n1,n2,n3 and 3 siblings s1,s2,s3 stored on each with clock [c1,c2,c3] (as outlined above):

  1. a client reads k from n1 and resolves siblings, [s1,s2,s3] producing s4, which is written back to n1. The resulting value stored at n1 is [s4] with logical clock [c1+1,c2,c3].
  2. a client reads k from n2 and resolves siblings, [s1,s2,s3] producing s5, which is written back to n2. The resulting value stored at n2 is [s5] with logical clock [c1,c2+1,c3].
  3. a client reads k from n3 and resolves siblings, [s1,s2,s3] producing s6, which is written back to n3. The resulting value stored at n3 is [s6] with logical clock [c1,c2,c3+1].
  4. async. replication of the value written to n1 reaches n2. After merging the replicated data the value at n1 is [s4,s5] and the clock is [c1+1,c2+1,c3], the value at n2 and n3 remain unchanged. The merge of the data at n1 does not result in a new round of replication.
  5. async. replication of the value written to n2 reaches n3. After merging the replicated data the value at n3 is [s5,s6] with the resulting clock being [c1,c2+1,c3+1]. n1 still stores [s4,s5] and the associated clock, while n2's value continues to remain unchanged. The merge of the data at n3 does not result in a new round of replication.
  6. async. replication of the value written to n1 reaches n3. After merging the replicated data the value at n3 is [s4,s5,s6] with the resulting clock being [c1+1,c2+1,c3+1]. The values at n1 and n2 do not change from their previous values. The merge of the data at n3 does not result in a new round of replication.
  7. async. replication of the value written to n3 reaches n1. After merging the replicated data the value at n3 is [s4,s6] with the resulting clock being [c1+1,c2,c3+1]. The values at n2 and n3 do not change. The merge of the data at n1 does not result in a new round of replication.
  8. async replication of the value written to n3 reaches n2. After merging the replicated data the value at n3 is [s4,s5,s6] with the resulting clock being [c1+1,c2+1,c3+1]. The values at n1 and n3 do not change. The merge of the data at n2 does not result in a new round of replication.
  9. async replication of the value written to n2 reaches n1. After merging the replicated data the value at n1 is [s4,s5,s6] with the resulting clock being [c1+1,c2+1,c3+1]. The values at n2 and n3 do not change. The merge of the data at n3 does not result in a new round of replication.

At this point, if we assume no more reads or writes are performed during the interleaving above, we have reached a consistent state and no more replication messages will be flowing through the system. The final state is all r1,r2,r3 all having the value [s4,s5,s6] with logical clock [c1+1,c2+1,c3+1]. This final state and initial state are the same. One is just a causal descendant of the other. We could imagine a case where the above or similar interleaving continues to occur over and over, leaving us in our initial state indefinitely.

Termination

If we can persist in this state indefinitely, the obvious question is can we escape it. An interleaving where the final state of r1,r2,..,rN is the same value with a single sibling would be one such way.

One such interleaving, assuming the same initial state, is shown below. In short, a round of replication completing before any other update of a resolved value will result in the desired end state.

  1. a client reads k from n1 and resolves siblings, [s1,s2,s3] producing s4, which is written back to n1. The resulting value stored at n1 is [s4] with logical clock [c1+1,c2,c3].
  2. async. replication of the value written to n1 reaches n2. After merging the replicated data the value at n2 is [s4] with logical clock [c1+1,c2,c3]. The values at n1 and n3 remain the same and the merge of the data at n2 does not result in a new round of replication.
  3. async. replication of the value written to n1 reaches n3. After merging the replicated data the value at n3 is [s4] with logical clock [c1+1,c2,c3]. The values at n1andn2do not change and the merge of the data atn3` does not result in a new round of replication.
  4. a client reads k from n2 and has no need to resolve siblings since the only one is s4.

The last step is ancillary but I mention it to round out the example. At the end of this interleaving, if no other operations are performed on the store, there will be no more messages flowing through the system.

Analysis

Since we can escape this undesirable state with some interleavings then persisting in it indefinitely requires that only the interleavings that do not meet the termination condition flow through the system. Of course, this assumes you believe the above. I'd like to prove it formally, but for now that will have to wait. So assuming you believe, it then becomes interesting to analyze how likely that this pathological case is and I'm going to try and do some math now.

Lets make some more assumptions about our cluster. I'm going to evolve these as we go along with this analysis but lets start with the following:

  • We have a1,a2,...aM application servers that act as clients for the store. In this case I'm thinking more about Riak but for the Cluster Metadata curious, lets say these clients may also invoke work that performs makes the store perform these read and update requests within the cluster (e.g. security storing some data in the store based on a request from one of the clients).
  • The network between the clients and the store is as fast as the network between the nodes -- they are in the same rack or whatever. This assumption should raise the likely hood of the pathological case.
  • Since we assume the networks are equally fast, lets assume it takes constant time to communicate between any client and any node in the store and between any two nodes. I'll refer to this as MessageTime.
  • Other work done by a node to facilitate an update or replicate a value is considered negligible -- we have perfect and infinitely fast disks/CPUs/memory/etc.

We can then define the time it takes for a client to send an update to a node a receive a successful response as:

UpdateTime = MessageTime + MessageTime
           = 2*MessageTime + WriteTime

Additionally, we can define the time it takes for an update to replicate to some or all nodes in the cluster. If we assume no concurrency while replicating asynchronously then we can define this as:

ReplicationTime = (N-1)*MessageTime

If we choose say 2 milliseconds as a value for UpdateTime Then the MessageTime is 1 millisecond and the ReplicationTime is N-1. If we assume a single client and two nodes, then the time it takes to update and replicate are equal. Now, if we throw-away some of the above assumptions and consider the overhead in the client in addition to variability in networks, the likely-hood of always seeing a pathological interleaving indefinitely seems small (seems is not real numbers...).

With 160 nodes or replicas in Cluster Metadata, the time to replicate to all of them would be about 7 milliseconds using the numbers above. I calculate this by considering that the number of hops needed to fully replicate (measured as the "Last Distance Hop"). This number grows linearly (for now, its known how we can make the curve better) and for 10 nodes it was 3, for 20 nodes it 4 and for 40 nodes it was 5, so extrapolating we get 7 for 160 nodes, which is the value for N in our ReplicationTime equation. Since we replicate concurrently between nodes, interchanging the node count for this hop count is safe given my assumptions.

If we assume a constant request rate and a 99% of chance of hitting distinct nodes for each concurrent request then yes its possible in this cluster to never reach the termination condition.

Conclusion

The question then becomes is this actually an issue or was it just an interesting exercise that helps all of us better understand eventual consistency. I see it as the latter although its no more than conjecture without trying to hit the case in an existing code base or attempt to explore it more formally. However, I'll try to outline my reasoning since its pretty short.

Applications that write data to an eventually consistent store, possibly resulting in siblings, need to account for conflict resolution. A smart conflict resolver is deterministic. Our pathological example did not result in something like sibling explosion (in fact the number of siblings is well bounded). If our concurrent resolutions each resolve to the same value the siblings potentially produced from it are in turn simple to resolve and so forth.

The Specific Case

This conversation came up in the context of Riak's security enable/disable feature, whose state is stored in Cluster Metadata. If the resolver were to use last-write-wins (the one built into cluster metadata), which we will call deterministic ignoring the potential randomness of clocks, and the initial siblings were {true, time1} and {false, time1+10seconds} then we would always pick the latter no matter which node we read from. That in turn might generate {false,time+20seconds} and {false, time+30seconds} on the two different nodes, which no matter what resolves to false (even if there are more siblings). So we can create siblings galore and still resolve to a correct value. If a user comes along and re-enables security and generates {true, time+50seconds} it may or may not take however, given the pathological case. Again the odds of persisting in that state forever are debatable, but its possible no matter how miniscule.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment