Skip to content

Instantly share code, notes, and snippets.

@technoweenie
Created June 8, 2011 00:02
Show Gist options
  • Save technoweenie/122849a52c5b33c5d890 to your computer and use it in GitHub Desktop.
Save technoweenie/122849a52c5b33c5d890 to your computer and use it in GitHub Desktop.
Dropbear: crappy Dropbox "clone" (ZeroMQ, Riak)
require 'rubygems'
require 'em-zeromq'
require 'yajl'
require File.expand_path('../dropbear_client_scanner', __FILE__)
Thread.abort_on_exception = true
module DropBear
class Client
def initialize(config)
@hash = {:id => config['id'], :set => config['set']}
@queue = []
@socket = nil
push :cmd => :connect, :id => config['id'], :addr => config['bind']
end
def push(hash, content = nil)
if @socket
hash = @hash.merge hash
puts "PUSH: #{hash.inspect}"
msg = Yajl.dump(hash)
if content
msg << "\n" << content
end
@socket.send_msg msg
else
@queue << [hash, content]
end
end
def on_writable(socket)
@socket = socket
@queue.each { |h, c| push(h, c) }
end
end
class Downloader
def initialize(handler, root)
@handler = handler
@root = root
end
def on_readable(socket, messages)
messages.each do |msg|
json, content = msg.copy_out_string.split("\n", 2)
header = Yajl.load(json)
full = "#{@root}/#{header['file']}"
case header['cmd']
when 'send'
stat = File.stat full
File.open full do |f|
while chunk = f.read(16384)
@handler.push({:cmd => :chunk, :file => header['file'],
:size => stat.size, :mtime => stat.mtime.to_i},
chunk)
end
end
when 'chunk'
File.open full, 'w' do |f|
f << content
end
puts "WROTE: #{full} (#{content.size})"
else
raise ArgumentError, "Unknown: #{header}"
end
end
end
end
end
config = Yajl.load IO.read('_dropbear.json')
root = File.expand_path(ARGV[0] || '.')
EM.run do
context = EM::ZeroMQ::Context.new 1
handler = DropBear::Client.new config
downloader = DropBear::Downloader.new handler, root
socket = context.connect ZMQ::PUSH, config['host'], handler
download = context.bind ZMQ::SUB, 'tcp://*:'+config['bind'].sub(/.*:/,''), downloader
download.subscribe ''
scanner = DropBear::Scanner.new handler
scanner.scan_dir root
end
require 'set'
module DropBear
class Scanner
class << self
attr_accessor :ignored
end
def initialize(handler)
@handler = handler
@root = nil
end
def scan_dir(path)
@root ||= path
if !File.directory?(path)
raise ArgumentError, "Not a directory: #{path}"
end
dir = Dir.new(path)
dir.each do |entry|
next if self.class.ignored.include?(entry)
scan_file path, entry
end
end
def scan_file(path, filename)
full = File.join path, filename
stat = File::Stat.new full
if stat.directory?
scan_dir full
elsif !stat.symlink? && stat.readable?
relative = full[@root.size+1..-1]
return if relative == '_dropbear.json'
@handler.push :cmd => :ping,
:file => relative, :mtime => stat.mtime.to_i
end
end
self.ignored = Set.new %w(node_modules vendor .git . ..)
end
end
require 'rubygems'
require 'em-zeromq'
require 'em-redis'
require 'riak/client'
require 'yajl'
Thread.abort_on_exception = true
module DropBear
class Server
def initialize(context)
@context = context
@clients = {}
@queue = []
@socket = nil
@redis = EventMachine::Protocols::Redis.connect
@riak = Riak::Client.new.bucket('dropbear')
@buffered_file = nil
@buffered_size = nil
@buffer = ''
end
def push(hash)
if @socket
hash = @hash.merge hash
puts "PUSH: #{hash.inspect}"
@socket.send_msg 'server', Yajl.dump(hash)
else
@queue << hash
end
end
def cmd_chunk(data, content)
if !@buffered
@buffered = data
@buffered_size = data['size']
end
@buffer << content
save_file! if @buffer.size >= @buffered_size
end
def cmd_ping(data, content)
check_file(data)
end
def cmd_connect(data, content)
@clients[data['id']] = data['addr']
end
def check_file(data)
key = "#{data['set']}:#{data['id']}"
@redis.hget key, data['file'] do |time_s|
time = time_s.to_i
# client data is newer
if time < data['mtime']
request_file(data['id'], data['file'])
# server data is newer
elsif time > data['mtime']
send_file(data)
end
end
end
# client's file is newer, so ask for it
def request_file(client, file)
unless host = @clients[client]
raise ArgumentError, "no host for #{client}"
end
send_to_clients [client], Yajl.dump(:cmd => :send, :file => file)
end
# server's file is newer, so send it
def send_file(data)
end
# saves a buffered file to riak
def save_file!
puts "Saved #{@buffered['file']}"
obj = Riak::RObject.new(@riak, @buffered['set']+':'+@buffered['file'])
obj.content_type = 'text/plain'
obj.data = @buffer
obj.store
send_update_client_state @buffered['set'], @buffered['id'],
@buffered['file'], @buffered['mtime']
update_other_clients @buffered['id'], obj, @buffered['file'], @buffered_size
@buffered = @buffered_size = nil
@buffer = ''
end
def send_update_client_state(set, client, file, mtime)
key = "#{set}:#{client}"
@redis.hset(key, file, mtime)
end
def update_other_clients(client, file, name, size)
other_clients = @clients.keys-[client]
return if other_clients.empty?
puts "Sending #{file.key} to #{other_clients * ", "}"
header = {:cmd => :chunk, :file => name, :size => size}
send_to_clients other_clients do |pub|
pub.send_msg "#{Yajl.dump(header)}\n#{file.data}"
end
end
def send_to_clients(clients, msg=nil)
hosts = clients.map { |c| @clients[c] }
host = hosts.shift
return if !host
pub = @context.connect ZMQ::PUB, host
hosts.each { |h| pub.connect h }
pub.send_msg msg if msg
yield pub if block_given?
pub.send :detach_and_close
end
def on_readable(socket, messages)
messages.each do |msg|
json, content = msg.copy_out_string.split("\n", 2)
data = Yajl.load json
send("cmd_#{data['cmd']}", data, content)
end
end
def on_writable(socket)
@socket = socket
@queue.each { |h| push(h) }
end
end
end
EM.run do
context = EM::ZeroMQ::Context.new 1
handler = DropBear::Server.new context
socket = context.bind ZMQ::PULL, 'tcp://*:5555', handler
end
@technoweenie
Copy link
Author

I really wanted this to be a single file to start the server and the clients, so I tried using EventMachine. It turned out way more complicated than I'd like, forcing me to spend all my time on this stuff, and barely any on the Riak stuff. I found out at the meetup that DotCloud uses a ton of ZeroMQ, and I got a few pointers on how they design their ZeroMQ scripts. I also found out why my original attempt at using Router sockets for this failed miserably.

@technoweenie
Copy link
Author

The slides for this talk are on Dropbox :)

@technoweenie
Copy link
Author

@meetmatt
Copy link

The blog link is broken

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment