Skip to content

Instantly share code, notes, and snippets.

@phiggins
Created July 2, 2010 19:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phiggins/461760 to your computer and use it in GitHub Desktop.
Save phiggins/461760 to your computer and use it in GitHub Desktop.
require 'eventmachine'
class MyP2PConnection < EM::Connection
def initialize(system)
@system = system
end
def receive_data(packet)
object = parse(packet)
port, ip = Socket.unpack_sockaddr_in(get_peername)
@system.receive_object(ip, port, object)
end
def send_object(host, port, object)
send_datagram(encode(object), host, port)
end
def parse(packet)
Marshal.load(packet) # or whatever
end
def encode(object)
Marshal.dump(object) # or whatever
end
end
class System
attr_reader :bind, :port
def initialize(port = 12345, bind = '0.0.0.0')
@bind, @port = bind, port
@peers = []
@transport = EM.open_datagram_socket(bind, port, MyP2PConnection, self)
end
def receive_object(ip, port, object)
case object[:type]
when :signon
log "#{ip}:#{port} signed on"
@peers << [ip, port]
when :signoff
# potential evil here
log "#{ip}:#{port} signed off"
@peers.delete [ip, port]
when :ping
log "#{ip}:#{port} pinged"
@transport.send_object(ip, port, :type => :echo)
when :echo
log "#{ip}:#{port} echod"
else
# handle bad stuff
end
end
def signon(ip, port)
log "send signon to #{ip}:#{port}"
@transport.send_object(ip, port, :type => :signon)
end
def signoff(ip, port)
log "send signoff to #{ip}:#{port}"
@transport.send_object(ip, port, :type => :signoff)
end
def ping(ip, port)
log "send ping to #{ip}:#{port}"
@transport.send_object(ip, port, :type => :ping)
end
def log(*args)
puts("#{bind}:#{port} : #{args.join(' ')}")
end
end
EM.run do
# Multi-node example:
#
num = 5
baseport = 12340
allports = (0..num).map { |i| baseport + i }
localhost = "127.0.0.1"
# For each of the nodes we want to make
num.times do |i|
port = baseport + i
# all other peers is everyone but this port
other_peers = allports - [port]
# Start the system on this port
system = System.new(port)
# We should use a state machine here inside the connection object, but
# simple example, we'll just wait a short time instead.
EM.add_timer(0.2) do
# Sign on to every other peer than the current node
other_peers.each do |port|
system.signon localhost, port
end
# After another short wait, send a ping to every node other than the
# current one
EM.add_timer(0.2) do
other_peers.each do |port|
system.ping localhost, port
end
end
end
end
# Simpler Example:
#
# system = System.new
# othersystem = System.new(12346)
#
# EM.add_timer(0.1) do
# system.signon "127.0.0.1", 12346
# othersystem.signon "127.0.0.1", 12345
# end
#
# EM.add_timer(0.2) do
# system.ping "127.0.0.1", 12346
# othersystem.ping "127.0.0.1", 12345
# end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment