Created
November 19, 2011 19:30
-
-
Save unnu/1379254 to your computer and use it in GitHub Desktop.
Ringbuffer Implementation with Eventmachine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'eventmachine' | |
require 'fiber' | |
class TcpReceiver < EM::Connection | |
attr_accessor :blocker | |
def initialize(receiver) | |
@receiver = receiver | |
super | |
end | |
def receive_data(data) | |
@receiver.write(data) | |
send_data <<-EOS | |
HTTP/1.0 200 OK | |
Content-Type: text/HTML | |
Content-Length: 2 | |
OK | |
EOS | |
close_connection_after_writing | |
end | |
end | |
class Actor | |
attr_reader :pointer | |
def initialize(buffer) | |
@blocker = [] | |
@buffer = buffer | |
@pointer = 0 | |
end | |
def add_blocker(blocker) | |
@blocker << blocker | |
@blocker.flatten! | |
end | |
private | |
def increase_pointer! | |
@pointer = next_pointer | |
end | |
def next_pointer | |
@pointer == @buffer.size - 1 ? 0 : @pointer + 1 | |
end | |
def blocked? | |
@blocker.each do |blocker| | |
return true if blocker.pointer == @pointer | |
end | |
false | |
end | |
def value | |
@buffer[@pointer] | |
end | |
def value=(data) | |
@buffer[@pointer] = data | |
end | |
end | |
class Consumer < Actor | |
def run | |
read_loop = proc do | |
unless blocked? | |
read(value) | |
increase_pointer! | |
end | |
EM.next_tick(read_loop) | |
end | |
read_loop.call | |
end | |
end | |
class Producer < Actor | |
def initialize(buffer) | |
super | |
end | |
def write(data) | |
write_loop = proc do | |
unless blocked? | |
increase_pointer! | |
self.value = data | |
else | |
EM.next_tick(write_loop) | |
end | |
end | |
write_loop.call | |
end | |
private | |
def blocked? | |
@blocker.each do |blocker| | |
return true if blocker.pointer == next_pointer | |
end | |
false | |
end | |
end | |
class Receiver < Producer | |
def run | |
EventMachine::start_server("127.0.0.1", 8081, TcpReceiver, self) | |
end | |
end | |
class Writer1 < Consumer | |
def read(data) | |
p "11111#{value}" | |
end | |
end | |
class Writer2 < Consumer | |
def read(data) | |
p "22222#{value}" | |
end | |
end | |
EventMachine.run { | |
buffer = Array.new(100) | |
writer1 = Writer1.new(buffer) | |
writer2 = Writer2.new(buffer) | |
receiver = Receiver.new(buffer) | |
writer1.add_blocker receiver | |
writer2.add_blocker receiver | |
receiver.add_blocker [writer1, writer1] | |
EventMachine::PeriodicTimer.new(1) do | |
p [receiver, writer1, writer2].map(&:pointer) | |
end | |
[receiver, writer1, writer2].map(&:run) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment