Riak 1.4 included counters. This was a break from the norm in
Riak. We’ve always said: “Your data is opaque to Riak”, but with
counters that isn’t true. Riak knows what you’re storing against a
counter key, and how to increment it and decrement it. You tell Riak
this with the counter API. You never fetch, mutate and put a counter,
you just say: increment by 5
or decrement by 100
. You never send a
vclock and most importantly, Riak knows how to merge concurrent writes
to a counter. You never see siblings for a counter, you always see a
single value. No writes are lost and eventually, the counter will
reflect all writes and reach a consistent value.
Counters are OK, but you can’t build many application on just counters. For Riak 2.0 we’ve add some more data types. We believe with the addition of these data types you can model many applications’ data storage needs with greater simplicity, and never have to write sibling merge functions again.
When we’ve talked in the past about adding data types to Riak we’ve
spoken about CRDTs
. CRDT stands for (variously) Conflict-free
Replicated Data Type, Convergent Replicated Data Type, Commutative
Replicated Data Type, and maybe others. The key, repeated phrase shows
that we’re dealing with Replicated Data Types.
Replication is normal for Riak. It is what the N-Value defines. Data Types are pretty common in computing. Sets, Bags, Lists, Registers, Maps, Counters…etc. That leaves us with the C to deal with.
State based, or Convergent CRDTs were thought up 15 years ago by Carlos Baquero and Franciso Moura[1] as an application of something called a Join Semi-Lattice: A triple of a Partially Ordered Set, a Least Element (Bottom hehehehehe) and a function that produces a Least Upper Bound. This function must be idempotent, associative, and commutative and when applied to two Sets returns a merged, or least upper bound Set that is also an instance of the Partially Ordered Set.
But what does that mean, and how is it useful?
Riak is an eventually consistent system. It leans very much toward the AP end of the CAP spectrum. We achieve this Availability with things like sloppy quorum writes to fallback nodes. However, even without partitions and many nodes, interleaved or concurrent writes can lead to conflicts. Traditionally Riak keeps all values and presents them to the user to resolve. The client application must have a deterministic way to resolve conflicts. It might be to pick the highest timestamp, or union all the values in a list, or something more complex. What ever it does, it is ad hoc, and created specifically for the data model and application at hand. But it must look just like that LUB function of the Join Semi-Lattice: It must be idempotent, commutative, and associative.
In Riak, the Conflict-free C is kind of a lie. There’s still conflict, it is just that the resolution is part of the data types' design. Of all the C’s above, Convergent is the one that matters to us. The data types we’ve created for Riak converge automatically, at write and read time, on the server. If a client application can model its data using the data types we provide, they will never see sibling values or need to write ad hoc, custom merge functions.
If you’re storing data in Riak, and you have allow_mult
set to
true
, then you need to handle conflicting writes. If you have to
handle it in your code you have to either simplify your data model or
code complex merge functions.
The classic example from the dynamo paper[2] is the Amazon shopping
cart. A single user adds two items to their cart, both adds hit
different partitioned servers. There are now two carts one with item
A
and one with item B
. The merge logic is simple: union the carts
to get a single correct value. But what of removes? How can you tell
if item A
is absent from cart two because it was added by A
and
not yet seen, or removed from cart two and cart one was unaware?
Maybe your application is slightly more complex, you need to add a count of how many of each item are in the cart. Now your merge function has to decide when presented with 6 instances of “hairbrush” in one sibling and 4 in another if the user added 6 and then removed 2 or added 4 and then added 2 more. Maybe you start to add tombstones. Or record operations (like StateBox.) But quickly it gets complex. And what if you need to record more information, or evolve your data model, your merge function must grow, adapt, work on different versions of your data? It becomes very complex very quickly.
And every new data model needs new ad hoc merge functions And when you come to store some other data in Riak, say user profiles, you need to start from scratch building up your data model and merge logic.
When modeling an application’s domain in a programming language developers are used to composing state from a few primitive data types, like Sets, Maps, Registers, Integers, Booleans etc. Data Types in Riak give the developer back that power and expressivity, and relieve them of the burden of designing and testing deterministic merge functions. The key is that the data is no longer opaque to Riak. When you use the Data Types API Riak “knows” what type of thing you are storing and is able to perform that semantic merge for you. Riak already detects conflicts, with Data Types it is able to merge them too.
When reading a Data Type value you will only ever see a single value. That value is still Eventually Consistent, but it will be as correct as it can be given the amount of entropy in the database, and when the system is quiescent, all values will converge on a single, deterministic, correct value.
We have some Data Types that you can store against a Key in Riak, that we’re calling Top Level Types.
-
Counters: As in Riak 1.4.
-
Sets: Which are collections of things. In Riak we expect you to store binaries, which is how we encode Strings of text in Erlang. The kind of thing you’d store in a Set might be the members of a team or department, followers on social network, or maybe objects in some real world collection.
-
Maps. A Map is way to compose Data Types into a richer, more complex structure. A Map is a collection of fields. A field is a name and Data Type pair. This is so that we don’t have to deal with merging fields of a different type. If two fields with the same name but different types are added to a Map, then they’re two different fields. You may only store Data Types in a Map.
You can store any of the top level types in a field in a Map, including a Map. And we’ve also added:-
- Registers: A binary value. It might be an email address, or a first name
- Flags: A Boolean.
The semantics for each of these data types differs from their regular, linear counterparts. Though we think the semantics we have chosen are the most intuitive and useful, and least surprising.
Since we’re storing our data types in Riak, (in riak_objects even) all the trade offs of Eventual Consistency apply (except of course conflict resolution.) That means that the Counters are not for creating unique, ordered IDs. And Sets and Maps do not have the atomic and blocking operations of their Redis counterparts.
Nothing has changed for counters. They’re still not idempotent. If Riak returns an Error for a counter operation, it may have only partially failed, and a retry may lead to a double count. However, adding an element to a Set, or a Field to a Map is idempotent.
The semantic we’ve chosen for the Set and Map is “Add Wins”. The literature also calls this “Observed Remove” but that is an implementation detail of how the Add Wins (and I’ll cover it below.)
When any pair of operations on a Set are concurrent, and one adds an element, while the other removes it, the add wins. If the remove causally follows the Add, then the Remove is effective. Concurrent operations on different elements work as you’d expect.
The Map borrows its behaviour directly from the Set. Except that every time you update the contents of a field (say increment the counter in the “likes” field, or add a buddy to the “follows” field) then that counts as “adding” the field. This way a concurrent removal of a field with an update to a field will see the update winning. Add wins again.
The difficult to answer question is what should the value of a field be when it is concurrently updated and removed. The answer is that the update wins, and the field remains, and it’s value is that of all surviving replicas’ merged.
Say a counter in a Map field is incremented to 5 at replica A and replicated to B and C. Concurrently the counter field is removed from A and incremented by 3 at C, the merged value will be 8. That is to say the remove at A does not reverse all A’s previous actions.
There is something surprising and imperfect about this semantic. If A had incremented the counter by 2 after it was partitioned from C but before it removed the counter field, that update is lost. Only the values that B and C have seen for A will remain. Removes are tricky.
There is another odd edge from removes that may also be surprising. Imagine that concurrently Replica A coordinates a removal of a Set Field from a Map while Replica B coordinates the removal of all elements from that same Set. As per the rules above, field updates count as “Adds” (for the add wins semantic) so the Field remains in the Map, albeit as an empty Set.
These Map only types have simple semantics. The Register is Last Write Wins, using a timestamp on the node handling the write. All the caveats about clock synchronization therefore apply. Flags start out Off, and you can turn them On. For any pair of concurrent, conflicting operations (On | Off) On wins. Again, that same Add Wins semantic.
More on this later, but all additive operations (that is everything except Set member and Map field removal) can be performed by simply sending operations to Riak. Not the usual Get, Modify, Update cycle. This “action at a distance” was introduced in Riak 1.4 with counters and extends to the new data types.
There are two APIs as ever, PB and HTTP. As of time of writing the HTTP API is unfinished so I’ll talk about the PB API. Assume that there will be parity.
The API allows you to specify operations to be performed on Data Types at a replica in Riak. For counters you may only send a single operation, “increment” with an amount (negative for decrement.)
You may send a list of operations. The list may contain both Add ElementX
and Remove ElementY
operations. If you are removing
elements we strongly recommend that you first fetch the Set and it’s
context
, and send the context with the remove operation(s.)
Why? See below.
All operations are executed atomically at the coordinating replica. If any operation in the list fails (only removes can fail!) then none of the operations are applied.
You may send a list of operations. These are either field operations
,
or field update operations
. Field operations Add or Remove fields from
the Map. I find it helps to think of the Map as a schema for a (JSON
like?) document. Field operations alter the schema of the Map.
Field Update Operations act on the data stored in the Map. You may send any number of operations batched together. You may mix Field Operations and Field Update Operations.
For example, if you model Game State as a Map for a particular user
and game. You could send an operation when the user starts a game that
creates a Map, adds fields of Counters
for points and lives, a Set
for achievements unlocked, and a Map
for inventory that contains two
Sets
(armor and weapons) and a Counter
for Hit Points.
As the game is played operations that update multiple Counters, add and remove elements from Sets and so on can be sent as batches that execute atomically at the coordinating replica. This does not suggest that you can enforce co-invariants between values in the Map.
What counts for Sets counts for Maps, we strongly recommend you send a context with any batch of operations that contain a Field or Set element Remove, no matter how deeply nested in the Map.
If you are only updating Fields in the Map then you do not need to fetch first, and you do not need a context, you may just send operations.
You do not need to explicitly create a field. Updating a field that is
not present at the coordinating replica will create and update the
field. For example, adding 10
to the Counter
in Field <<”gold”>>
in the Map at key Game1
will create the field if it is not present,
and then increment by 10.
Why the context? Two reasons, the first is simple:
We don’t allow you to remove something from a Set / Map that is not there. Since there is no guarantee that the replica coordinating your remove operation(s) contains the value(s) you want to remove (imagine an empty fallback spun up to accept the request) the context “seeds” the handling replica with the values you’ve seen. If you don’t send the context, and the replica doesn’t have the value(s) you want to remove, the operations fails with “precondition failure” error. A precondition of removing an element or Field is that it is present.
The second reason is more subtle and is going to need some implementation specifics, which I’ll cover later, to really understand. At this point it is enough to say that without a context for a remove, you may remove more than you planned to. The “Add Wins” semantic is based on “Observed Remove”, which means only remove that which you have seen. The context tells the replica handling the operation what you’ve seen. If an “Add” for the element you want to remove was handled or seen by the replica after you sent your remove, and there was no context, the remove would win over the concurrent add. There maybe times you want this, but in general, use the context for removes.
The context is a compact binary encoding of the Set or Map. We hope to minimize it further in future releases.
The main cost is that the Data Types take up space. There is some computational cost to the merge functions, that will be performed on your Riak servers, rather than in your client application. We have yet to measure this.
We store the Data Types is riak_objects
. This is so they play nice
with all Riak’s systems, like AAE, Enterprise Multi Data Center
Replication, read repair etc. So off the bat, we have the overhead of
a riak_object.
The Data Types themselves are at least as big as what they contain,
plus a version vector, plus some Dots
(see below.) We’ve tried to
keep them small with an efficient binary representation (and we’ll
keep improving on that), but they are larger than one might first
imagine.
Expect: a single integer.
IRL: A version vector with two integers per actor that has coordinated
an increment. Each actor is 8 bytes. Expect at least N-val
actors.
Expect: The sum of the size of its members.
IRL: The sum of the size of its members, plus a version vector, and a
minimal version vector (at most the size of the Set vector, typically
one {actor, count}
pair) per member. The size of the version vector
depends on N-Val, MDC, Cluster Churn etc. Again, 8 bytes per actor,
though we only store each actor once.
Expect: The sum of the size of the keys plus the sum of the size of the values.
IRL: Each key is a pair of {name, int}
where int maps to a module
that implements the Data Type of the field. Each member also has a
minimal clock (as for the Set.)
- No invariants.
- Counter increments aren’t idempotent.
- You can’t store non-CRDT values in the Maps.
- No 2i, riak_search or Yokozuna as of 2.0pre5.
- No JavaScript MapReduce over CRDTs as they’re a binary format.
- Can't cook your dinner.
You know enough to use Riak's Data Types now. If you really want to know how the sausage is made, read on.
It is worth remembering at this point that Vnodes
are the unit of
concurrency in Riak. Whenever I say Actor
I mean Vnode
and Vnode Id
. Correctness in Riak depends on individual Actors acting serially.
The Counter is a CRDT called a PN-Counter, where P is Positive and N
is Negative. It is a list of triples of {actor, positive, negative}
where the value is the difference between the sum of all positives and
the sum of all negatives. An actor may only update its own entry in
the list. When two counters merge we take the maximum of positive and
negative for each actor. When an actor is only in one counter, we just
keep its value in the merged counter.
Flags are logically equivalent to a Set that can only contain one element. Whether the element is present or absent is equivalent to whether the flag is On or Off, respectively. The same “Add Wins” / “Observed Remove” behavior applies, except with Flags we call it “Observed Disable”. To the user Flags look like Boolean values.
Registers are a pair of {value, timestamp}
. They converge on the
highest timestamp. Much like a value in Cassandra. They require well
synchronized clocks. When two registers merge, the pair with the
highest timestamp is the merged value.
The crucial part of any convergent, or state based CRDT is its merge function. The merge function is the LUB of the Join-Semi-Lattice, and it is what defines the semantic of the data type, as well as being a generalization of all those ad hoc conflict resolutions customers might have had to write.
The merge function for an optimized OR-Set[3] is pretty simple for any elements in both sets: they’re in the set. The difficulties arise when an element is only in one of the two Sets being merged.
When two replicas merge, and one contains an element in its Set that the other does not, why is it there? It can either be:
- The element was added to one replica and the other is yet to see it
- The element was once in both replicas but one has removed it already.
We need to know why an element is only in one set to arrive at a correct merged value. As ever in these things, causality to the rescue. It stands to reason that if the element was once in Set A but is no longer, it was removed. We could store a tombstone value for the removed item, but that means our sets never get smaller. A Set with one member that once had 100 members, would be the same size as a Set with 100 members.
Instead what we do is attach a version vector
to the Set, and every
time an element is added to the Set, we increment the entry in the
version vector for the replica that added the element. We also store
the {actor, count}
pair that results from the increment against the
element (which I’m going to call the Dot
[4] from now on.) If there
is already a Dot associated with the element we keep that too. What we
end up with looks like
[{actor1, count}=Dot1, {actor2, count}=Dot2,…]
, which is a version
vector, but it is a minimal clock, that stores only the Dots or
events when the element was added. Note that the version vector
attached to the whole set will always dominate all the minimal clocks
for all elements.
When an element is removed from the set we simply remove the Element and its minimal clock.
Now, when a merge occurs we compare the two sets. We take all the
elements that are in Set A
and not in Set B
and compare their
minimal clocks to Set B’s set version vector. Every element whose
minimal clock is dominated has been removed from Set B, and does not
make it into the merged set. As a slight optimization, we also drop
any dots from the minimal clock that are dominated by Set B’s
clock. This keeps the minimal clock minimal. You can think of it as
subtracting the minimal clock from Set B’s set version vector, if any
Dots are left, the element is in the merged set with those remaining
Dots as the new minimal clock.
We repeat the process the other way, comparing all Set B’s elements that are not in Set A to Set A’s set version vector.
We keep all elements that are in both Sets, merging their minimal clocks. Finally we merge the two set version vectors to ensure the property that the Set version vector always dominates all minimal clocks is maintained.
This kind of fine grained causality tracking is very closely related to Dotted Version Vectors[4]
Sets are implemented as a version vector, and a dictionary of mappings
from element -> minimal clock
.
As far as implementation goes, Maps are just like the Set described above. They use the same Map version vector, and a minimal clock per entry to decide what to do with a Field that is only on one side of a merge.
The main difference is of course when an element is in both Maps: we call the Data Type’s merge function to get a single, convergent value. Conceptually merging two Maps is the same as merging two Sets of Fields, and then calling merge on all common Fields' values.
[1] Specication of Convergent Abstract Data Types for Autonomous Mobile Computing. Carlos Baquero Francisco Moura http://gsd.di.uminho.pt/members/cbm/ps/scadt3.pdf
[2] Dynamo: amazon's highly available key-value store http://dl.acm.org/citation.cfm?id=1294281
[3] An Optimized Conflict-free Replicated Set Annette Bieniusa et al http://arxiv.org/pdf/1210.3368.pdf
[4] Dotted Version Vectors: Logical Clocks for Optimistic Replication Nuno Preguiça, Carlos Baquero, Paulo Sérgio Almeida, Victor Fonte, Ricardo Gonçalves http://arxiv.org/abs/1011.5808
How do CRDTs interact with yokozuna and SOLR? are they indexable?
Edit: found basho/yokozuna#250