Skip to content

Instantly share code, notes, and snippets.

@clr
Created September 7, 2012 00:48
Show Gist options
  • Save clr/3662032 to your computer and use it in GitHub Desktop.
Save clr/3662032 to your computer and use it in GitHub Desktop.

Rolling with Eventual Consistency

Keywords: riak, ripple, statistics, eventually consistent, race condition, CRDT, map/reduce, analytics

In a previous post I wrote about the different mindset that a software engineer should have when building for a key-value database as opposed to a relational database. When working with a relational database, you describe the model first and then query the data later. With a key-value database, you focus first on what you want the result of the query to look like, and then work backward toward a model.

As an example, I described averaging some value:

"In a SQL database, we can just call average() on that column, and it will compute the answer at query time. In a key-value store, we can add logic in the application layer to catch the object before it enters Riak, fetch the average value and number of included elements from Riak, compute the new rolling average, and save that answer back in Riak."

Some questions came in about what that would specifically look like in code. The rest of this post will explore a solution that takes into account the distributed, eventually consistent nature of Riak.

Average the Scenario

Say we have a DataPoint object. An object of this type can be created, but for simplicity let us say that it is never changed or deleted. Every DataPoint has a value property which is an integer. In our application, we sometimes want the average of all of the value properties for all of the DataPoint objects. Oh, and let’s say we have trillions of DataPoint objects.

Notice that according to my previous definition, this usage scenario is ad-hoc because we don’t know exactly when the application needs the average, but it is not dynamic because we know that we will be querying for the average of the value property, and not the average of some property defined at query time.

Using the client library ripple, we might define the object as follows:

class DataPoint
  include Ripple::Document
  property :value, Integer, :presence => true
end

The ripple library stores objects as JSON behind the scenes. The following request sent to Riak:

curl http://127.0.0.1:8091/buckets/data_points/keys/somekey

might return something like:

{"value":23,"_type":"DataPoint"}

Pretty simple so far. Now how should we calculate the average? The naïve solution would be to read all of the DataPoint objects out, read their value property, and average them. This might work for small sets of objects, but remember we have trillions of them. Fetching all of that data for one simple query is not a realistic solution.

We know that we want the average at the application level. Riak is really good at serving single objects. So instead of querying for the average, we should simply fetch the average as its own data object in Riak. In ripple this might look like:

class Statistic
  include Ripple::Document
  property :average, Float, :presence => true
end

Now we just have to hook the two together. We can add a hook to the DataPoint object so that every time it is saved, it updates the Statistic object.

class DataPoint
  include Ripple::Document
  property :value, Integer, :presence => true
  after_save :update_statistics

  def update_statistics
    id = 'data_point_document'
    statistic = StatisticDocument.find(id) || StatisticDocument.new
    statistic.key = id
    statistic.update_with self.value
  end
end

We need to define some new properties and the update_with method for the Statistic object.

class Statistic
  include Ripple::Document
  property :average, Float, :presence => true
  property :count, Integer, :presence => true

  def update_with(value)
    self.average = (self.average * self.count + value) / (self.count + 1)
    self.count = self.count + 1
    self.save
  end
end

By storing the additional information of the number of objects in the count property, we have enough information to reconstruct a rolling average.

Fast Answers

Now whenever we want the average value, we simply fetch the one Riak object:

Statistic.find('data_point_document').average

Our answer comes back really fast, because it has been pre-computed.

But Wait!

The example above works great for one single-threaded application. To preserve the distributed and fault-tolerant features that Riak provides for us, we need to do a little more work.

Consider the following scenario. One copy of the application, let’s call it ClientA, is talking to the Riak cluster. Another, ClientB, is also talking to the cluster. ClientA and ClientB both want to add a DataPoint object. After saving their respective DataPoint objects, they both fetch the Statistic object, which currently has average property set to 10.0, and compute a new average. ClientA thinks the new average property is 12.0. But ClientB added a different value, so it thinks the new average property is 9.0. Both save to the Riak cluster, and we have a classic race condition. Which client wins? It doesn’t matter, because both answers are wrong. Both fail to take into account the DataPoint object handled by the other client.

To get the correct answer, let’s separate the clients. Both clients can save to the same object, but we will partition the data within the object. Formerly the json object looked like:

{"average":10.0,"count":4,"_type":"Statistic"}

We want it to look more like:

{"client_data":{"ClientA":{"average":10.0,"count":4},"ClientB":{"average":10.0,"count":4}}"_type":"Statistic"}

The data model is more complicated, but we now have enough information to compute the correct answer. We will change the model so that ClientA will only change the "ClientA" portion of the data object, and ClientB will only change the "ClientB" portion of the object. The application will know when we ask for the answer to compute the average across all clients.

The Statistic object now looks something like this:

class Statistic
  include Ripple::Document
  property :client_data, Hash, :presence => true

  def update_with(value)
    self.reload
    self.client_data ||= {}
    statistic = self.client_data[Client.id] || {'average' => 0.0, 'count' => 0}
    statistic['average'] = (statistic['average'] * statistic['count'] + value) / (statistic['count'] + 1)
    statistic['count']   = (statistic['count'] + 1)
    self.client_data[Client.id] = statistic
    self.save
  end

  def count
    self.client_data.map{|h| h[1]['count']}.inject(0, &:+)
  end

  def average
    self.client_data.map{|h| h[1]['count'] * h[1]['average']}.inject(0, &:+).to_f / self.count
  end
end

The method Client.id in the above can be set in a couple of different ways. In this case, we set it using an environment variable passed in at runtime. We rely on a single thread per application to ensure that each client has a single, consistent client identifier, and that it always writes to its own portion of the data object.

We also make sure that we always fetch the object before we write to it, so that this client is always working with the latest copy of the object and not a stale one that was updated somewhere else in the application.

But Wait [Again]!

We solved part of the problem by keeping each client out of the other’s business, but we still have a race condition when the clients try to save the same Statistic object to the cluster. If Riak is operating on a last-write-wins principle, then whichever client has the later timestamp is going to overwrite the other. That gives us the wrong answer, and that won’t do.

We can rely on Riak’s vector clocks to solve this problem. First, we set the allow_mult property on the bucket for Statistic objects so that Riak will store both copies of an object when two come in during a race condition.

Statistic.bucket.allow_mult = true

For reasons outside of the scope of this post, we also want to make sure that we have a PR and a PW quorum greater than N. This helps us ensure that our answer isn't misread from a fallback node when we reload the Statistic object. N is 3 by default, so the following does the trick by setting PR and PW both to 2.

Statistic.bucket.props = Statistic.bucket.props.merge(:pr => 2, :pw => 2)

When ripple reads an object, it gets a vector clock. When it saves the object, it sends back the same vector clock. Every time Riak changes an object, it changes the vector clock in such a way that it preserves the clock's lineage. So if Riak gets back a different vector clock with an object than the one it currently has, it knows it has a collision and saves both values as siblings.

Our race condition plays out. We now have two sibling objects in Riak stored under the same key: one is up-to-date for ClientA, and the other is up-to-date for ClientB. The truth is somewhere in between.

The next time ripple reads the Statistic object, it gets back both objects with a new vector clock for the set. We now have to resolve the conflict to find the true answer. We know that ClientA's true answer is going to have the highest count property for "ClientA", and we know that ClientB's true answer is going to have the highest count property for "ClientB". Since we know that only one copy of a given client was writing at a time [single-threaded], we know that only the client data with the highest count is correct.

We can find the true answer by merging the siblings and comparing the data one client at a time. For each client, we always take the value with the highest count property and throw away the smaller counts, which represent older values.

When ripple saves back the object, it sends the new vector clock as well so that Riak knows to replace the old siblings with this new resolved object. We handle this with the on_conflict method in ripple.

class StatisticDocument
  include Ripple::Document
  property :client_data, Hash, :presence => true

  def update_with(value)
    self.reload
    self.client_data ||= {}
    statistic = self.client_data[Client.id] || {'average' => 0.0, 'count' => 0}
    statistic['average'] = (statistic['average'] * statistic['count'] + value).to_f / (statistic['count'] + 1)
    statistic['count']   = (statistic['count'] + 1)
    self.client_data[Client.id] = statistic
    self.save
  end

  def count
    self.client_data.map{|h| h[1]['count']}.inject(0, &:+)
  end

  def average
    self.client_data.map{|h| h[1]['count'] * h[1]['average']}.inject(0, &:+).to_f / self.count
  end

  on_conflict do |siblings, c|
    resolved = {}
    siblings.reject!{|s| s.client_data == nil}
    siblings.each do |sibling|
      resolved.merge! sibling.client_data do |client_id, resolved_value, sibling_value|
        resolved_value['count'] > sibling_value['count'] ? resolved_value : sibling_value
      end
    end
    self.client_data = resolved
  end
end

Voila! We now have a rolling average that gracefully handles race conditions caused by the distributed nature of a Riak cluster.

A working example of this solution is available in my riak-rolling-average repo. As usual, the good stuff is in the tests. You can run the tests with bundle exec rake CLIENT=someclient where someclient is unique to that Ruby thread.

Note that test_example.rb runs five clients concurrently by shelling out to five rake tasks. Each rake task loads in a pre-defined chunk of data from the data file. This causes plenty of race conditions, which is what we want to simulate. We still get the correct answer.

The Path Not Traveled

If you aren't familiar with my previous advice on approaching key-value architecture, you might be tempted to solve a use case like the average described above using Riak’s map/reduce functionality. There are several reasons why the solution above would be preferred, but let us play the devil’s advocate and explore the map/reduce solution.

You can view the entire code in the repo below, but if we solve this using javascript map/reduce functions, then we can extract the data from each DatePoint object in the following map phase:

function(riakObject){
  match = riakObject.values[0].data.match(/\"value\":([\\d]+)/);
  if(match){
    return [[parseInt(match[1]), 1]];
  } else {
    return [null];
  }
}

Then compute the average by combining the results together in the following reduce phase:

function(values){
  var sum = 0.0;
  var count = 0;
  for(i=0; i<values.length; i++){
    value = values[i];
    if(value){
      sum = sum + (value[0] * value[1]);
      count = count + value[1];
    }
  }
  if(count > 0) {
    return [[(sum / count), count]];
  } else {
    return [];
  }
}

If you look in the mapreduce branch of the riak-rolling-average repo, you will find the code for this example. On my local laptop, which I admit is not optimized for this type of operation, it takes about 3 seconds to fetch the answer with map/reduce searching over 5000 DataPoint objects. [A compiled Erlang map/reduce function would perform much better.] It only took a few milliseconds to fetch the answer using the pre-computing code.

Even if I did have a more powerful cluster on which to run this map/reduce, recall that we have trillions of DataPoint objects. Each object must be fetched from the key-value store and pulled into the javascript virtual machine for processing. This is essentially equivalent to the naïve approach described earlier, but performed on the server instead of the application client. If multiple users initiate the same map/reduce, contention for resources would quickly overwhelm the cluster's hardware. In practice, we generally reserve Riak's map/reduce for data migrations and offline analysis although there are exceptions to that guideline.

Conceptually, the map/reduce solution might be simpler to architect and it might seem more intuitive to people from an RDBMS background. Admittedly, the recommended solution above uses more resources during writes, opens the possibility of number of clients and vector clocks expanding the object size and complexity, and generally requires a more complex model; however, it also provides a much more performant answer and is more suitable to the eventual consistency and distributed nature of Riak.

Other Issues

Note that I did not address bootstrapping the average when DataPoint objects already exist, nor handling deletes or updates to existing DataPoint objects.

Happy Coding

Some of the ways we need to think about architecture problems when we write applications for Riak might not be intuitive at first, because the same problems are already solved by convention in the RDBMS world. Some of this shift in mindset is necessitated by the key-value nature of Riak. As the rolling average example here demonstrates, some concerns that we need to address in the application architecture are brought about by the distributed, eventually consistent nature of Riak as well. This shift in mindset is a tradeoff that you can choose to make for the sake of high availability, fault tolerance, horizontal scaling, and other nice features that Riak provides. If you enjoy learning new things to take advantage of new capabilities, then I'd wager you will enjoy developing applications with Riak.

Casey

Related

  • A working example of this code is available in my riak-rolling-average repo. Check out the mapreduce branch for the production-unfriendly version using Riak's map/reduce.
  • Vector Clocks are Easy
  • Vector Clocks are Hard
  • Meangirls provides serializable data types for eventually consistent systems, similar to but more comprehensive than my example.
  • Hanover also provides eventually consistent data types.
  • The Statistic object is an eventually consistent data type, an example of a CRDT: Convergent or Commutative Replicated Data Type. Read the comprehensive research paper on CRDT.
  • Bryce Kerley speaking about CRDTs at SRCFringe Edinburgh 2012
  • I put some hours into a gem ripple-statistics which adds this and other simple calculations to the ripple library. Note the limitations of the current code there.
@beerriot
Copy link

Hi, Casey. You asked for ways to emphasize that MR is great, but not the best solution for this problem. Here are a few I came up with:

  1. Re-emphasize that this use case is known at application-design/data-modeling time. The MR path is the only option (outside of brute-force reading and/or re-writing everything) for doing this query adhoc after the data already exists without the roll up included.
  2. Note that doing the calculation and storing the result (the non-MR) solution requires addition bandwidth (network traffic, write load, etc.) at write time. (Not emphasizing that MR is great, per se, but noting that there are trade offs.)
  3. Be more explicit about what changes would make the non-MR solution inappropriate. You mention at the top that the values are write-once without delete, but I'm not sure an inexperienced Riak user would understand why both of those are important. (More trade offs)
  4. Note that the non-MR solution has the same inherent problem as vector clocks: large numbers of actors lead to large objects and/or garbage to collect. Especially under high write contention, the number of siblings may skyrocket. (Again with the trade offs.)
  5. Describe what failure scenarios the non-MR solution has. For example, what if one of the two writes (value or stats) fails? (One last tradeoff)
  6. Use Erlang functions when comparing MR and non-MR speed. It's true that Javascript is easier to get going, but if the query is known in advance, it makes sense to spend time writing and installing Erlang code for it.

Great post otherwise, though.

@clr
Copy link
Author

clr commented Sep 11, 2012

Bryan, tyvm. I incorporated your comments, and you've convinced me that map/reduce needs its own blog post to reinforce this next month.

@reiddraper
Copy link

Ruby is single-threaded by nature. [Threading in Ruby is possible, but it takes explicit work.]

I'd find a way to reword this. By this definition (taking explicit work), most every common language is single threaded by nature.

@reiddraper
Copy link

We also make sure that we always fetch the object before we write to it, so that this client is always working with the latest copy of the object and not a stale one that was updated somewhere else in the application.

There's a little bit of a subtlety here, which I think mentioned I'd hold off on in our last conversation, but now is a good time to bring it up. Each of these actors (the ruby threads) requires read-your-writes consistency. Otherwise, they might create conflicts with themselves... In order to achieve this, we need to use PR + PW > N (technically there is a bug in Riak that makes these constraints not even quite enough...). R + W > N is not enough, because Riak uses sloppy quorums (ie. fallbacks can be used to fulfill a quorum).

@reiddraper
Copy link

For reasons outside of the scope of this post, we also want to make sure that we have a PR and a PW quorum greater than N.

I'd reword this:

For reasons outside the... have PR + PW > N.

@reiddraper
Copy link

Either way, this snippet doesn't make sense:

PR and a PW quorum greater than N

@randysecrist
Copy link

As an app developer - I get stuck on PR + PW > N, wanting to know either more about this or knowing what type of write availability impact this change has on an app. Intuitively I think I understand 'why', but perhaps an image or sequence diagram in a related followup would help here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment