-
-
Save technoweenie/122849a52c5b33c5d890 to your computer and use it in GitHub Desktop.
Dropbear: crappy Dropbox "clone" (ZeroMQ, Riak)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'em-zeromq' | |
require 'yajl' | |
require File.expand_path('../dropbear_client_scanner', __FILE__) | |
Thread.abort_on_exception = true | |
module DropBear | |
class Client | |
def initialize(config) | |
@hash = {:id => config['id'], :set => config['set']} | |
@queue = [] | |
@socket = nil | |
push :cmd => :connect, :id => config['id'], :addr => config['bind'] | |
end | |
def push(hash, content = nil) | |
if @socket | |
hash = @hash.merge hash | |
puts "PUSH: #{hash.inspect}" | |
msg = Yajl.dump(hash) | |
if content | |
msg << "\n" << content | |
end | |
@socket.send_msg msg | |
else | |
@queue << [hash, content] | |
end | |
end | |
def on_writable(socket) | |
@socket = socket | |
@queue.each { |h, c| push(h, c) } | |
end | |
end | |
class Downloader | |
def initialize(handler, root) | |
@handler = handler | |
@root = root | |
end | |
def on_readable(socket, messages) | |
messages.each do |msg| | |
json, content = msg.copy_out_string.split("\n", 2) | |
header = Yajl.load(json) | |
full = "#{@root}/#{header['file']}" | |
case header['cmd'] | |
when 'send' | |
stat = File.stat full | |
File.open full do |f| | |
while chunk = f.read(16384) | |
@handler.push({:cmd => :chunk, :file => header['file'], | |
:size => stat.size, :mtime => stat.mtime.to_i}, | |
chunk) | |
end | |
end | |
when 'chunk' | |
File.open full, 'w' do |f| | |
f << content | |
end | |
puts "WROTE: #{full} (#{content.size})" | |
else | |
raise ArgumentError, "Unknown: #{header}" | |
end | |
end | |
end | |
end | |
end | |
config = Yajl.load IO.read('_dropbear.json') | |
root = File.expand_path(ARGV[0] || '.') | |
EM.run do | |
context = EM::ZeroMQ::Context.new 1 | |
handler = DropBear::Client.new config | |
downloader = DropBear::Downloader.new handler, root | |
socket = context.connect ZMQ::PUSH, config['host'], handler | |
download = context.bind ZMQ::SUB, 'tcp://*:'+config['bind'].sub(/.*:/,''), downloader | |
download.subscribe '' | |
scanner = DropBear::Scanner.new handler | |
scanner.scan_dir root | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'set' | |
module DropBear | |
class Scanner | |
class << self | |
attr_accessor :ignored | |
end | |
def initialize(handler) | |
@handler = handler | |
@root = nil | |
end | |
def scan_dir(path) | |
@root ||= path | |
if !File.directory?(path) | |
raise ArgumentError, "Not a directory: #{path}" | |
end | |
dir = Dir.new(path) | |
dir.each do |entry| | |
next if self.class.ignored.include?(entry) | |
scan_file path, entry | |
end | |
end | |
def scan_file(path, filename) | |
full = File.join path, filename | |
stat = File::Stat.new full | |
if stat.directory? | |
scan_dir full | |
elsif !stat.symlink? && stat.readable? | |
relative = full[@root.size+1..-1] | |
return if relative == '_dropbear.json' | |
@handler.push :cmd => :ping, | |
:file => relative, :mtime => stat.mtime.to_i | |
end | |
end | |
self.ignored = Set.new %w(node_modules vendor .git . ..) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'em-zeromq' | |
require 'em-redis' | |
require 'riak/client' | |
require 'yajl' | |
Thread.abort_on_exception = true | |
module DropBear | |
class Server | |
def initialize(context) | |
@context = context | |
@clients = {} | |
@queue = [] | |
@socket = nil | |
@redis = EventMachine::Protocols::Redis.connect | |
@riak = Riak::Client.new.bucket('dropbear') | |
@buffered_file = nil | |
@buffered_size = nil | |
@buffer = '' | |
end | |
def push(hash) | |
if @socket | |
hash = @hash.merge hash | |
puts "PUSH: #{hash.inspect}" | |
@socket.send_msg 'server', Yajl.dump(hash) | |
else | |
@queue << hash | |
end | |
end | |
def cmd_chunk(data, content) | |
if !@buffered | |
@buffered = data | |
@buffered_size = data['size'] | |
end | |
@buffer << content | |
save_file! if @buffer.size >= @buffered_size | |
end | |
def cmd_ping(data, content) | |
check_file(data) | |
end | |
def cmd_connect(data, content) | |
@clients[data['id']] = data['addr'] | |
end | |
def check_file(data) | |
key = "#{data['set']}:#{data['id']}" | |
@redis.hget key, data['file'] do |time_s| | |
time = time_s.to_i | |
# client data is newer | |
if time < data['mtime'] | |
request_file(data['id'], data['file']) | |
# server data is newer | |
elsif time > data['mtime'] | |
send_file(data) | |
end | |
end | |
end | |
# client's file is newer, so ask for it | |
def request_file(client, file) | |
unless host = @clients[client] | |
raise ArgumentError, "no host for #{client}" | |
end | |
send_to_clients [client], Yajl.dump(:cmd => :send, :file => file) | |
end | |
# server's file is newer, so send it | |
def send_file(data) | |
end | |
# saves a buffered file to riak | |
def save_file! | |
puts "Saved #{@buffered['file']}" | |
obj = Riak::RObject.new(@riak, @buffered['set']+':'+@buffered['file']) | |
obj.content_type = 'text/plain' | |
obj.data = @buffer | |
obj.store | |
send_update_client_state @buffered['set'], @buffered['id'], | |
@buffered['file'], @buffered['mtime'] | |
update_other_clients @buffered['id'], obj, @buffered['file'], @buffered_size | |
@buffered = @buffered_size = nil | |
@buffer = '' | |
end | |
def send_update_client_state(set, client, file, mtime) | |
key = "#{set}:#{client}" | |
@redis.hset(key, file, mtime) | |
end | |
def update_other_clients(client, file, name, size) | |
other_clients = @clients.keys-[client] | |
return if other_clients.empty? | |
puts "Sending #{file.key} to #{other_clients * ", "}" | |
header = {:cmd => :chunk, :file => name, :size => size} | |
send_to_clients other_clients do |pub| | |
pub.send_msg "#{Yajl.dump(header)}\n#{file.data}" | |
end | |
end | |
def send_to_clients(clients, msg=nil) | |
hosts = clients.map { |c| @clients[c] } | |
host = hosts.shift | |
return if !host | |
pub = @context.connect ZMQ::PUB, host | |
hosts.each { |h| pub.connect h } | |
pub.send_msg msg if msg | |
yield pub if block_given? | |
pub.send :detach_and_close | |
end | |
def on_readable(socket, messages) | |
messages.each do |msg| | |
json, content = msg.copy_out_string.split("\n", 2) | |
data = Yajl.load json | |
send("cmd_#{data['cmd']}", data, content) | |
end | |
end | |
def on_writable(socket) | |
@socket = socket | |
@queue.each { |h| push(h) } | |
end | |
end | |
end | |
EM.run do | |
context = EM::ZeroMQ::Context.new 1 | |
handler = DropBear::Server.new context | |
socket = context.bind ZMQ::PULL, 'tcp://*:5555', handler | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I really wanted this to be a single file to start the server and the clients, so I tried using EventMachine. It turned out way more complicated than I'd like, forcing me to spend all my time on this stuff, and barely any on the Riak stuff. I found out at the meetup that DotCloud uses a ton of ZeroMQ, and I got a few pointers on how they design their ZeroMQ scripts. I also found out why my original attempt at using Router sockets for this failed miserably.