Skip to content

Instantly share code, notes, and snippets.

@tlossen
Created October 18, 2012 19:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tlossen/3914390 to your computer and use it in GitHub Desktop.
Save tlossen/3914390 to your computer and use it in GitHub Desktop.
redis swarm daemon (proof-of-concept implementation)
#!/usr/bin/env ruby
require 'socket'
require 'thread'
require 'set'
REDIS = ['localhost', 6379]
UDP = ['172.18.167.255', 7777]
WHITELIST = ['sadd', 'srem']
class SafeSet < Set
def initialize(*args); super; @mutex = Mutex.new; end
def add(x); @mutex.synchronize { super }; end
def delete?(x); @mutex.synchronize { super }; end
end
def format(command)
["*#{command.size}", command.map { |x| ["$#{x.size}", x] }, ""].flatten.join("\r\n")
end
def parse(payload)
payload.scan(/\"([^"]+)\"/).flatten
end
BasicSocket.do_not_reverse_lookup = true
skip_inbound = SafeSet.new
skip_outbound = SafeSet.new
Thread.new do
udp = UDPSocket.new
udp.bind(*UDP)
puts " inbound: listening to udp on #{UDP.join(':')}"
redis = TCPSocket.new(*REDIS)
loop do
data, from = udp.recvfrom(1024)
next if skip_inbound.delete?(data)
puts " inbound: #{data}"
skip_outbound.add(data)
redis.puts(format(parse(data)))
end
end
Thread.new do
udp = UDPSocket.new
udp.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
redis = TCPSocket.new(*REDIS)
redis.puts(format(["monitor"]))
puts "outbound: monitoring redis on #{REDIS.join(':')}"
while line = redis.gets
data = line.scan(/\".*$/)[0]
next unless data
next if skip_outbound.delete?(data)
command = parse(data)[0]
next unless command && WHITELIST.include?(command.downcase)
puts "outbound: #{data}"
skip_inbound.add(data)
udp.send(data, 0, UDP[0], UDP[1])
end
end.join
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment