Skip to content

Instantly share code, notes, and snippets.

@sandro
Created August 31, 2011 17:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sandro/31a63a28138326d2609d to your computer and use it in GitHub Desktop.
Save sandro/31a63a28138326d2609d to your computer and use it in GitHub Desktop.
HashProxy
*.gem
.bundle
Gemfile.lock
pkg/*
vendor
rvm use 1.9.2

HashProxy

A distributed, persistent, key/value store providing language-agnostic storage-engines.

Requirements

  • Ruby 1.9.2 (Fibers)
  • ZeroMQ
  • ConsistentHashr gem

Example

  1. Start a node to store data

    bundle exec bin/hash-proxy-node

  2. Start the proxy server

    bundle exec bin/hash-proxy

  3. Connect the client to the proxy

$ bundle console
>> c = HashProxy::Client.new
=> #<HashProxy::Client:0x00000100b7ff38 @endpoint="tcp://127.0.0.1:6789", @ctx=#<ZMQ::Context:0x00000100b7fee8>, @socket=#<ZMQ::Socket:0x00000100b7fec0>>
>> c.get('key')
=> nil
>> c.set('key', 'value')
=> "value"
>> c.get('key')
=> "value"
>> c.list
=> ["key"]
>> c.delete('key')
=> "value"
>> c.get('key')
=> nil
>> c.list
=> []
  1. Start a new node

    bundle exec bin/hash-proxy-node

    Close the first node with CTRl-C, then check c.list; the array will be empty

  2. Check the dump (persistence)

    The 'dump' log is written out every second if 1000 entries have been made.

    $ cat dump

  3. The log gets restructured (truncated) every 60 seconds, leaving only the relevant changes.

  4. Restarting the server when the log is present will redistribute the key-value pairs to all of the nodes present. It's always recommended to start the nodes before starting the proxy server. The server reditributes the keys almost immediately, and if a node is missing, the key will be stored on the next available node. When the original node returns, the server will search for the key there on the original node, and won't find the value.

  5. Because zmq is the transport protocol, nodes can be written in any language (with a zmq driver). Check out node.py as an example (python node.py).

  6. Don't need the persistence or distribution? Connect directly to a node. A small benchmark shows it to be twice as fast.

$ bundle exec bin/hash-proxy-node serve
$ bundle console
>> c = HashProxy::Client.new
>> c.set('foo', 'bar')
>> c.get('foo')

Thoughts

Having the nodes, and client all connect to a single process makes distribution very simple. Nodes can be added and removed at will without making changes to the client. The proxy server is the only IP/port that needs to be configured. While configuration is dead-simple, the proxy represents a single point of failure in our system. I've made no provisions for dealing with this problem, though it is solvable. I'm curious to know where the major speed bottle-neck is. Early on, I imagined the cache store (node) would be the bottle-neck, which is why I designed them to be language agnostic. I'd love to see a Ruby node compared to a C node. Though, when I connect directly to a node, skipping the proxy, it's clear that the proxy itself creates some latency, which is disappointing.

module HashProxy
class Client
def initialize(endpoint="tcp://127.0.0.1:6789")
@endpoint = endpoint
@ctx = ZMQ::Context.new
@socket = @ctx.socket(ZMQ::REQ)
at_exit { @socket.close; @ctx.close; }
connect
end
def connect
@socket.connect(@endpoint)
end
def list
@socket.send("LIST:", ZMQ::NOBLOCK)
l = process(@socket.recv)
l = l.split(",").map{|s| URI.unescape(s)}
end
alias keys list
def list_raw
@socket.send("LIST:", ZMQ::NOBLOCK)
process(@socket.recv)
end
def get(key)
@socket.send("GET:#{key}", ZMQ::NOBLOCK)
process(@socket.recv)
end
alias [] get
def set(key, value)
@socket.send("SET:#{key}:#{value}", ZMQ::NOBLOCK)
process(@socket.recv)
end
alias []= set
def delete(key)
@socket.send("DELETE:#{key}", ZMQ::NOBLOCK)
process(@socket.recv)
end
def process(data)
instruction, value = data.split(":", 2)
case instruction
when "ACKLIST", "ACKSET"
value
when "ACKGET", "ACKDELETE"
value unless value.empty?
else
raise "Unknown response: #{data}"
end
end
end
end
source "http://rubygems.org"
# Specify your gem's dependencies in hash_proxy.gemspec
gemspec
#!/usr/bin/env ruby
require 'hash_proxy'
endpoint = ARGV[0] && !ARGV[0].empty? ? ARGV[0] : "tcp://127.0.0.1:6789"
HashProxy::Proxy.new(endpoint).serve
#!/usr/bin/env ruby
require 'hash_proxy'
args = ARGV.dup
args.unshift('register') if args.empty?
endpoint = args[1] || "tcp://127.0.0.1:6789"
if args[0] == "register"
HashProxy::Node.new.register(endpoint)
elsif args[0] == "serve"
HashProxy::Node.new(endpoint).serve
else
abort("Recognized commands are 'register' and 'serve'")
end
# -*- encoding: utf-8 -*-
$:.push File.expand_path("../lib", __FILE__)
require "hash_proxy/version"
Gem::Specification.new do |s|
s.name = "hash_proxy"
s.version = HashProxy::VERSION
s.authors = ["Sandro Turriate"]
s.email = ["sandro.turriate@gmail.com"]
s.homepage = ""
s.summary = %q{TODO: Write a gem summary}
s.description = %q{TODO: Write a gem description}
s.rubyforge_project = "hash_proxy"
s.files = `git ls-files`.split("\n")
s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
s.require_paths = ["lib"]
# specify any dependencies here; for example:
s.add_runtime_dependency "zmq"
s.add_runtime_dependency "consistent_hashr"
s.add_development_dependency "ruby-debug19"
end
require "hash_proxy/version"
module HashProxy
require 'zmq'
require 'consistent_hashr'
require 'fiber'
require 'socket'
autoload 'Client', 'hash_proxy/client'
autoload 'Node', 'hash_proxy/node'
autoload 'Proxy', 'hash_proxy/proxy'
autoload 'RestructurePersistence', 'hash_proxy/restructure_persistence'
module ServerRemover
def remove_server(_name)
@number_of_replicas.times do |t|
@circle.delete hash_key("#{_name}+#{t}")
end
end
end
ConsistentHashr.extend ServerRemover
end
import zmq
import atexit
import socket
class Node:
def __init__(self, endpoint=None):
if endpoint:
self.endpoint = endpoint
else:
self.endpoint = "tcp://127.0.0.1:%s" % self.next_available_port()
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.REP)
self.store = {}
def close_ctx():
self.socket.close(); self.ctx.term()
atexit.register(close_ctx)
def serve(self):
self.socket.bind(self.endpoint)
print("Node starting on %s" % self.endpoint)
while True:
data = self.socket.recv()
self.process(data)
def process(self, data):
properties = data.split(":", 3)
instruction = properties[0]
key = properties[1] if len(properties) > 1 else None
value = properties[2] if len(properties) > 2 else None
if instruction == "LIST":
def translate(string):
return string.replace(",", "%2C")
keys = ",".join(map(translate, self.store.keys()))
self.send("ACKLIST", keys)
elif instruction == "SET":
v = self.store[key] = value
self.send("ACKSET", v)
elif instruction == "GET":
value = self.store[key] if self.store.has_key(key) else ""
self.send("ACKGET", value)
elif instruction == "DELETE":
value = self.store.pop(key) if self.store.has_key(key) else ""
self.send("ACKDELETE", value)
def register(self, endpoint):
client = self.ctx.socket(zmq.REQ)
client.connect(endpoint)
self.send("NODE", self.endpoint.replace(":", "%3A"), client)
client.recv()
def notify_close():
self.send("NODEGONE", self.endpoint.replace(":", "%3A"), client)
client.recv()
client.close()
atexit.register(notify_close)
self.serve()
def next_available_port(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("",0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port
def send(self, key, value, socket=None):
socket = socket if socket else self.socket
return socket.send("%s:%s" % (key.upper(), value), zmq.NOBLOCK)
if __name__ == '__main__':
Node().register('tcp://127.0.0.1:6789')
module HashProxy
class Node
def initialize(endpoint=nil)
if endpoint
@endpoint = endpoint
else
@endpoint = "tcp://127.0.0.1:#{next_available_port}"
end
@ctx = ZMQ::Context.new
@socket = @ctx.socket(ZMQ::REP)
at_exit { @socket.close; @ctx.close; }
@store = {}
end
def serve
@socket.bind @endpoint
puts "Node starting on #{@endpoint}"
while data = @socket.recv
process(data)
end
end
def process(data)
instruction, key, value = data.split(":", 3)
case instruction
when "LIST"
send("ACKLIST", @store.keys.map {|s| URI.escape(s.to_s, ',')}.join(","))
when "SET"
send("ACKSET", @store[key] = value)
when "GET"
send("ACKGET", @store[key])
when "DELETE"
send("ACKDELETE", @store.delete(key))
end
end
def register(endpoint)
client = @ctx.socket(ZMQ::REQ)
client.connect(endpoint)
send("NODE", URI.escape(@endpoint, ":"), client)
client.recv
at_exit do
send("NODEGONE", URI.escape(@endpoint, ":"), client)
client.recv
client.close
end
serve
end
private
def next_available_port
server = TCPServer.new('127.0.0.1', 0)
@port = server.addr[1]
ensure
server.close if server
end
def send(key, value, socket=@socket)
socket.send("#{key.to_s.upcase}:#{value}", ZMQ::NOBLOCK)
end
end
end
module HashProxy
class Proxy
def initialize(endpoint)
@endpoint = endpoint
@ctx = ZMQ::Context.new
@socket = @ctx.socket(ZMQ::REP)
@nodes = {}
@filename = 'dump'
@file = File.open(@filename, 'a')
@buffer = []
@restructure_persistence_fork = Thread.new {}
at_exit { @socket.close; @ctx.close; }
end
def read_fiber
Fiber.new do
while true
if ZMQ.select([@socket], [], [], 0.5)
data = @socket.recv
process(data)
end
Fiber.yield
end
end
end
def persistence_fiber
Fiber.new do
while true
if @buffer.size >= 1000 && not_restructuring?
@file.write @buffer.join("\n")
@file.puts
@file.fsync
@buffer.clear
end
Fiber.yield
end
end
end
def persistence_restructure_fiber
Fiber.new do |tick|
while true
until tick > 60 && not_restructuring?
tick += Fiber.yield
end
puts "Restructuring '#{@filename}' for greater efficiency."
@file.fsync
pid = fork { RestructurePersistence.new(@filename); exit! }
@restructure_persistence_fork = Process.detach(pid)
tick = 0
end
end
end
def tick_manager
@tick_manager ||= TickManager.new
end
def not_restructuring?
@restructure_persistence_fork.stop?
end
def recover_fiber
Fiber.new do
while @nodes.empty?
Fiber.yield
end
puts "Attempting to recover from '#{@filename}'."
File.open(@filename) do |f|
f.each do |data|
process(data.strip, true)
end
end
end
end
def serve
@socket.bind @endpoint
puts "Server starting on #{@endpoint}"
tick_manager.register(persistence_fiber)
tick_manager.register(persistence_restructure_fiber)
fibers = [read_fiber, recover_fiber, tick_manager]
while true
fibers.each do |fiber|
if fiber.alive?
fiber.resume
else
fibers.delete(fiber)
end
end
end
end
def process(data, noreply=false)
instruction, key, value = data.split(":", 3)
case instruction
when "NODE"
key = URI.unescape(key)
@nodes[key] = Client.new(key)
ConsistentHashr.add_server(key, @nodes[key])
send("ACK")
when "NODEGONE"
key = URI.unescape(key)
@nodes.delete(key)
ConsistentHashr.remove_server(key)
send("ACK")
when "LIST"
aggregate_list
when "SET"
@buffer << data unless noreply
client = ConsistentHashr.get(key)
value = client.set(key, value)
send("ACKSET", value) unless noreply
when "GET"
client = ConsistentHashr.get(key)
send("ACKGET", client.get(key))
when "DELETE"
@buffer << data
client = ConsistentHashr.get(key)
send("ACKDELETE", client.delete(key))
end
end
def aggregate_list
lists = @nodes.values.map do |client|
client.list_raw
end.join(",")
send("ACKLIST", lists)
end
private
def send(key, value=nil, socket=@socket)
socket.send("#{key.to_s.upcase}:#{value}", ZMQ::NOBLOCK)
end
end
class TickManager
def initialize
@last_tick = Time.now
@subscribers = []
end
def register(fiber)
@subscribers << fiber
end
def fiber
@fiber ||= Fiber.new do
while true
if tick > 1
@subscribers.each {|f| f.resume(tick) if f.alive?}
@last_tick = Time.now
end
Fiber.yield
end
end
end
def alive?
fiber.alive?
end
def resume
fiber.resume
end
def tick
Time.now - @last_tick
end
end
end
require "bundler/gem_tasks"
module HashProxy
class RestructurePersistence
def initialize(filename)
@filename = filename
@store = {}
read
write
end
def read
File.open(@filename, 'r') do |f|
f.each do |data|
instruction, key, value = data.split(":", 3)
case instruction
when "SET"
@store[key] = data
when "DELETE"
@store.delete(key)
end
end
end
end
def write
File.open(@filename, 'w') do |f|
f.write @store.values.join
end
end
end
end
module HashProxy
VERSION = "0.0.1"
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment