Skip to content

Instantly share code, notes, and snippets.

@technoweenie
Created August 30, 2011 06:01
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save technoweenie/a0529c96f8587063b235 to your computer and use it in GitHub Desktop.
Save technoweenie/a0529c96f8587063b235 to your computer and use it in GitHub Desktop.
zerodb

ZeroDB is a simple key/value store using ZeroMQ as the transport. It's pretty basic and synchronous right now. Keys are written to disk as-is.

Requirements

Start a server

ruby server.rb

Start a Subscriber client to watch for changes

$ irb -r ./sub.rb
s = Subscriber.new
s.subscribe 'a'
s.on_set { |key| puts "#{key} :)" }
s.on_delete { |key| puts "#{key} :(" }
s.run

Start a key/value client

$ irb -r ./client.rb
c = Client.new
c.set 'abc', 123
c.set 'def', 456
c.get 'abc', 'def'
c.each { |key| puts key }
c.delete 'abc'
c.each { |key| puts key }
c.delete 'def'
require 'zmq'
class Client
class ServerError < StandardError
end
attr_reader :context, :req
def initialize(context = nil)
@context = context || ZMQ::Context.new
@req = @context.socket ZMQ::REQ
@req.connect 'tcp://127.0.0.1:5555'
end
def get(*keys)
@req.send "GET #{keys * " "}"
error? || begin
if keys.size == 1
@req.recv
else
keys.map { |k| @req.recv }
end
end
end
def set(key, value)
@req.send "SET #{key}", ZMQ::SNDMORE
@req.send value.to_s
error? || true
end
def delete(*keys)
@req.send "DELETE #{keys * " "}"
error? || true
end
def each(&block)
@req.send "KEYS"
error? || begin
while key = @req.recv
if key == "DONE"
return
else
block.call key
end
end
end
end
def error?
if @req.recv != 'OK'
raise ServerError, @req.recv
end
end
def close
@req.close
end
end
require 'zmq'
require 'fileutils'
class DB
def initialize
@root = File.join(File.dirname(__FILE__), 'data')
FileUtils.mkdir_p @root
end
def get(key)
name = file(key)
File.exist?(name) ? IO.read(name) : nil
end
def set(key, value)
File.open file(key), 'w' do |io|
io << value
end
end
def delete(key)
FileUtils.rm_rf key
end
def each(&block)
dir = Dir.new @root
dir.each do |entry|
next if entry =~ /^\./
block.call entry
end
end
def file(key)
File.join @root, key
end
end
class Server
def initialize(db, context = nil)
@db = db
@context = context || ZMQ::Context.new
@rep = @context.socket ZMQ::REP
@rep.bind 'tcp://*:5555'
@pub = @context.socket ZMQ::PUB
@pub.bind 'tcp://*:5556'
end
def run
while msg = @rep.recv
cmd, *keys = msg.split ' '
next unless valid_keys?(keys)
method = "#{cmd.downcase}_command"
if respond_to?(method)
send(method, keys)
else
error "Invalid command: #{cmd}"
end
end
rescue Interrupt
puts "CLOSING"
@rep.close
end
def get_command(keys)
if keys.length > 0
@rep.send 'OK', ZMQ::SNDMORE
last = keys.pop
keys.each do |k|
@rep.send @db.get(k).to_s, ZMQ::SNDMORE
end
@rep.send @db.get(last).to_s
else
error "Need at least 1 key"
end
end
def set_command(keys)
if keys.length == 1
@db.set keys[0], @rep.recv
@rep.send 'OK'
@pub.send "#{keys[0]} SET"
else
error "Need only 1 key"
end
end
def delete_command(keys)
keys.each do |k|
@db.delete k
@pub.send "#{k} DELETE"
end
@rep.send 'OK'
end
def keys_command(keys)
@rep.send 'OK', ZMQ::SNDMORE
@db.each do |key|
@rep.send key, ZMQ::SNDMORE
end
@rep.send "DONE"
end
def error(msg)
@rep.send "ERROR", ZMQ::SNDMORE
@rep.send msg
end
def valid_keys?(keys)
keys.each do |k|
if k =~ /^[a-z0-9]+$/
k.downcase!
else
error "Invalid key: #{k.inspect}"
return false
end
end
true
end
end
Server.new(DB.new).run
require 'zmq'
class Subscriber
def initialize(context = nil)
@context = context || ZMQ::Context.new
@sub = @context.socket ZMQ::SUB
@sub.connect 'tcp://127.0.0.1:5556'
@on_set = @on_delete = nil
end
def subscribe(channel)
@sub.setsockopt ZMQ::SUBSCRIBE, channel
end
def unsubscribe(channel)
@sub.setsockopt ZMQ::UNSUBSCRIBE, channel
end
def on_set(&block)
@on_set = block
end
def on_delete(&block)
@on_delete = block
end
def run
while msg = @sub.recv
key, cmd = msg.split ' '
block = case cmd
when 'SET' then @on_set
when 'DELETE' then @on_delete
end
block ? block.call(key) : puts(key)
end
rescue Interrupt, IRB::Abort
puts "CLOSING"
@sub.close
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment