Skip to content

Instantly share code, notes, and snippets.

@unnu
Created November 19, 2011 19:30
Show Gist options
  • Save unnu/1379254 to your computer and use it in GitHub Desktop.
Save unnu/1379254 to your computer and use it in GitHub Desktop.
Ringbuffer Implementation with Eventmachine
require 'rubygems'
require 'eventmachine'
require 'fiber'
class TcpReceiver < EM::Connection
attr_accessor :blocker
def initialize(receiver)
@receiver = receiver
super
end
def receive_data(data)
@receiver.write(data)
send_data <<-EOS
HTTP/1.0 200 OK
Content-Type: text/HTML
Content-Length: 2
OK
EOS
close_connection_after_writing
end
end
class Actor
attr_reader :pointer
def initialize(buffer)
@blocker = []
@buffer = buffer
@pointer = 0
end
def add_blocker(blocker)
@blocker << blocker
@blocker.flatten!
end
private
def increase_pointer!
@pointer = next_pointer
end
def next_pointer
@pointer == @buffer.size - 1 ? 0 : @pointer + 1
end
def blocked?
@blocker.each do |blocker|
return true if blocker.pointer == @pointer
end
false
end
def value
@buffer[@pointer]
end
def value=(data)
@buffer[@pointer] = data
end
end
class Consumer < Actor
def run
read_loop = proc do
unless blocked?
read(value)
increase_pointer!
end
EM.next_tick(read_loop)
end
read_loop.call
end
end
class Producer < Actor
def initialize(buffer)
super
end
def write(data)
write_loop = proc do
unless blocked?
increase_pointer!
self.value = data
else
EM.next_tick(write_loop)
end
end
write_loop.call
end
private
def blocked?
@blocker.each do |blocker|
return true if blocker.pointer == next_pointer
end
false
end
end
class Receiver < Producer
def run
EventMachine::start_server("127.0.0.1", 8081, TcpReceiver, self)
end
end
class Writer1 < Consumer
def read(data)
p "11111#{value}"
end
end
class Writer2 < Consumer
def read(data)
p "22222#{value}"
end
end
EventMachine.run {
buffer = Array.new(100)
writer1 = Writer1.new(buffer)
writer2 = Writer2.new(buffer)
receiver = Receiver.new(buffer)
writer1.add_blocker receiver
writer2.add_blocker receiver
receiver.add_blocker [writer1, writer1]
EventMachine::PeriodicTimer.new(1) do
p [receiver, writer1, writer2].map(&:pointer)
end
[receiver, writer1, writer2].map(&:run)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment