Create a gist now

Instantly share code, notes, and snippets.

@eric /README.md
Last active Dec 17, 2015

What would you like to do?
Zookeeper-based storage adapter for rollout

Update

You can now find this packaged up nicely in a rubygem as rollout-zk.


Zookeeper storage adapter for rollout

I've implemented a zookeeper-based storage adapter for rollout that does not require any network roundtrips to check if a feature is active for a user.

It uses zookeeper watches to store the entire dataset in memory (which will generally be very small) and updates the local cache as soon as anything changes.

If you haven't migrated to rollout v2, you can just use this :legacy_storage option and things will migrate to redis. If you have alerady migrated, it would take some manual work (that I think could be fairly easy to automate).

Usage

    $rollout = Rollout.new(
      ZookeeperRolloutStorage.new($zookeeper, "/rollout/users"),
      :legacy_storage => $redis, :migrate => true)

Items of note

  • it uses the zk gem
  • all settings are stored is a single znode
  • this will automatically migrate everything properly if you haven't migrated to rollout v2 yet
require 'zk'
module Zookeeper
class DistributedHashtable
def initialize(zk, path)
@zk = zk
@path = path
@mutex = Mutex.new
@callbacks = []
configure_watches
end
def on_change(&block)
@mutex.synchronize do
@callbacks << block
end
block.call
end
def fire
callbacks = @mutex.synchronize do
@callbacks.dup
end
callbacks.each do |cb|
begin
cb.call
rescue => e
# Report an exception somewhere
end
end
end
def [](key)
@mutex.synchronize do
if @hashtable
return @hashtable[key]
end
end
end
def []=(key, value)
result = @mutex.synchronize do
update do |hashtable|
hashtable[key] = value
end
end
fire
return result
end
def has_key?(key)
@mutex.synchronize do
if @hashtable
@hashtable.has_key?(key)
end
end
end
def delete(key)
result = @mutex.synchronize do
update do |hashtable|
hashtable.delete(key)
end
end
fire
return result
end
def merge(other)
result = @mutex.synchronize do
update do |hashtable|
hashtable.merge(other)
end
end
fire
return result
end
def to_h
@mutex.synchronize do
if @hashtable
@hashtable.dup
else
{}
end
end
end
def each(&block)
to_h.each(&block)
end
def length
@mutex.synchronize do
if @hashtable
@hashtable.length
else
0
end
end
end
def empty?
length == 0
end
alias_method :blank?, :empty?
def read
@mutex.synchronize do
begin
current, _ = @zk.get(@path, :watch => true)
@hashtable = Yajl::Parser.parse(current)
rescue ZK::Exceptions::NoNode
if @zk.exists?(@path, :watch => true)
retry
else
@hashtable = Hash.new
end
end
end
fire
end
def update(&block)
return update_exists(&block)
rescue ZK::Exceptions::NoNode
begin
return update_initial(&block)
rescue ZK::Exceptions::NodeExists
return update_exists(&block)
end
end
def update_exists(&block)
begin
current, stat = @zk.get(@path, :watch => true)
hashtable = Yajl::Parser.parse(current)
result = block.call(hashtable)
@zk.set(@path, Yajl::Encoder.encode(hashtable), :version => stat.version)
@hashtable = hashtable
return result
rescue ZK::Exceptions::BadVersion
sleep 0.1 + rand
retry
end
end
def update_initial(&block)
begin
hashtable = Hash.new
result = block.call(hashtable)
@zk.create(@path, Yajl::Encoder.encode(hashtable))
@hashtable = hashtable
return result
rescue ZK::Exceptions::NoNode
@zk.mkdir_p(File.dirname(@path))
retry
end
end
def configure_watches
@register ||= @zk.register(@path) do
read
end
@on_connected ||= @zk.on_connected do
read
end
begin
read
rescue ZK::Exceptions::OperationTimeOut
# Ignore these, we'll get them next time
# record something with your metrics provider
rescue ::Zookeeper::Exceptions::ContinuationTimeoutError
# Ignore these, we'll get them next time
# record something with your metrics provider
rescue ::Zookeeper::Exceptions::NotConnected
# Ignore these, we'll get them next time
# record something with your metrics provider
end
end
end
end
end
require 'zookeeper/distributed_hashtable'
class ZookeeperRolloutStorage
def initialize(zk, path)
@cache = Zookeeper::DistributedHashtable.new(zk, path)
end
def get(key)
@cache[key]
end
def set(key, value)
@cache[key] = value.to_s
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment