Skip to content

Instantly share code, notes, and snippets.

@headius
Forked from wycats/chunked_array.rb
Created March 15, 2009 22:32
Show Gist options
  • Save headius/79563 to your computer and use it in GitHub Desktop.
Save headius/79563 to your computer and use it in GitHub Desktop.
class ChunkedArray
def initialize(bucket_size = 500)
@array, @bucket_size = [[]], bucket_size
end
def push(item)
if !@array.last || @array.last.size == @bucket_size
@array << []
end
@array.last << item
end
def [](index)
bucket = (index / @bucket_size)
idx = index % @bucket_size
if @array[bucket]
@array[bucket][idx]
end
end
def gc(lowest_valid_item)
bucket = (lowest_valid_item / @bucket_size) - 1
0.upto(bucket) do |b|
@array[b] = nil
end
end
end
describe "a chunked Array" do
before(:each) do
@array = ChunkedArray.new
end
it "works like an array for the first element" do
@array.push 1
@array[0].should == 1
end
it "works like an Array for the 750th element" do
750.times {|i| @array.push i }
@array[749].should == 749
end
it "continues to work after collecting a bucket" do
750.times {|i| @array.push i }
@array.gc(501)
@array[0].should == nil
@array[749].should == 749
end
end
require "chunked_array"
require "thread"
Thread.abort_on_exception = true
module Orchestra
Event = Struct.new(:name, :time, :thread_id, :payload)
class Stream
attr_reader :mutex, :signaler
def initialize
@stream = ChunkedArray.new
@mutex, @signaler = Mutex.new, ConditionVariable.new
@listeners = {}
end
def push(name, *payload)
@stream.push Event.new(name, Time.now, Thread.current.object_id, payload)
@signaler.broadcast
end
def at(pointer)
@stream[pointer]
end
def register_listener(listener)
@listeners[listener] = 0
end
def unregister_listener(listener)
@listeners.delete(listener)
end
def checkin(listener, pointer)
@listeners[listener] = pointer
min = @listeners.min {|a,b| a[1] <=> b[1]}[1]
@stream.gc(min)
end
end
class Listener
def initialize(stream, event_name)
@stream, @event_name, @pointer = stream, regex(event_name), 0
@stream.register_listener(self)
end
def disconnect!
@stream.unregister_listener(self)
end
def regex(name)
Regexp.new("^" << Regexp.escape(name).gsub(/\\\*/, ".*") << "$")
end
# this should only be called under lock
def wait
@stream.signaler.wait(@stream.mutex)
end
def next
if item = @stream.at(@pointer)
@pointer += 1
@stream.checkin(self, @pointer)
item.name =~ @event_name ? item : self.next
end
end
def consume
Thread.new do
loop {
@stream.mutex.synchronize do
loop { (event = self.next) ? break : wait }
end
yield event
}
end
end
end
end
describe "a listener with events on the queue", :shared => true do
it "is on the queue when Listener#next is called" do
event = @listener.next
event.should be_event(:name => (@name || "awesome"), :payload => ["orchestra"])
end
end
describe "Orchestra::Listener" do
class BeEvent
def initialize(options)
options[:thread_id] ||= Thread.current.object_id
@options = options
end
def matches?(event)
if !event
@error = "Expected event to exist"
elsif event.name != @options[:name]
@error = "Expected event's name to be #{@options[:name].inspect}, but was #{event.name.inspect}"
elsif !event.time.kind_of?(Time)
@error = "Expected event's time to be a Time object, but was #{event.time.inspect}"
elsif event.thread_id != @options[:thread_id]
@error = "Expected event's thread id to be #{@options[:thread_id]}, but was #{event.thread_id}"
elsif event.payload != @options[:payload]
@error = "Expected event's payload to be #{@options[:payload].inspect}, but was #{event.payload.inspect}"
end
!@error
end
def failure_message() @error end
end
def be_event(options)
BeEvent.new(options)
end
before(:each) do
@stream = Orchestra::Stream.new
end
it "is instantiated with a Stream and event name" do
o = Orchestra::Listener.new(@stream, "eventz")
end
describe "instantiated listening to the 'awesome' event" do
before(:each) do
@listener = Orchestra::Listener.new(@stream, "awesome")
end
it "returns nil from #next before anything has been pushed" do
@listener.next.should == nil
end
describe "when blocking on an event" do
before(:each) do
@test = []
Thread.new { @listener.wait; @test << @listener.next }
end
it "initially blocks" do
@test.should be_empty
end
it "unblocks when an event is pushed on the queue" do
@stream.push("awesome", "orchestra")
sleep 0.1
@test.first.should be_event(:payload => ["orchestra"], :name => "awesome")
end
end
describe "when consuming events" do
before(:each) do
@test = []
@listener.consume {|event| @test << event }
end
it "initially blocks" do
@test.should be_empty
end
it "unblocks when a matching event is pushed on the queue" do
@stream.push "awesome", "orchestra"
sleep 0.1
@test.first.should be_event(:payload => ["orchestra"], :name => "awesome")
end
it "unblocks when non-matching events are pushed, followed by matching ones" do
@stream.push "wot", "orchestra"
@stream.push "wot", "orchestra"
@stream.push "awesome", "orchestra"
sleep 0.1
@test.size.should == 1
@test.first.should be_event(:payload => ["orchestra"], :name => "awesome")
end
end
describe "when several listeners are consuming events" do
before(:each) do
@test1, @test2 = [], []
listener = Orchestra::Listener.new(@stream, "wot")
listener.consume {|event| @test1 << event}
listener = Orchestra::Listener.new(@stream, "awesome")
listener.consume {|event| @test2 << event}
end
it "initially blocks" do
@test1.should be_empty
@test2.should be_empty
end
it "unblocks all queues when matching events are pushed on" do
@stream.push "wot", "listener1"
@stream.push "awesome", "listener2"
sleep 0.1
@test1.size.should == 1
@test1.first.should be_event(:payload => ["listener1"], :name => "wot")
@test2.size.should == 1
@test2.first.should be_event(:payload => ["listener2"], :name => "awesome")
end
end
describe "when an event is added to the stream that matches" do
before(:each) do
@stream.push("awesome", "orchestra")
end
it_should_behave_like "a listener with events on the queue"
end
describe "when an event is added to the stream that doesn't match" do
before(:each) do
@stream.push("wot", "orchestra")
end
it "is not on the queue when Listener#next is called" do
event = @listener.next
event.should == nil
end
end
describe "when events are added that don't match, followed by one that does" do
before(:each) do
@stream.push("wot", "orchestra")
@stream.push("wot", "orchestra")
@stream.push("awesome", "orchestra")
end
it_should_behave_like "a listener with events on the queue"
end
end
describe "instantiated listening to the 'awesome.*'" do
before(:each) do
@listener = Orchestra::Listener.new(@stream, "awesome.*")
end
describe "when an event is added to the stream that matches" do
before(:each) do
@name = "awesome.rails"
@stream.push(@name, "orchestra")
end
it_should_behave_like "a listener with events on the queue"
end
describe "when an event is added to the stream that doesn't match" do
before(:each) do
@stream.push("awesomezrails", "orchestra")
end
it "is not on the queue when Listener#next is called" do
event = @listener.next
event.should == nil
end
end
end
describe "multiple listeners and many events" do
before(:each) do
@listeners = [
Orchestra::Listener.new(@stream, "wot"),
Orchestra::Listener.new(@stream, "wotwot")
]
1000.times do |i|
@stream.push("wot", "payload#{i}")
end
end
it "is safe when many events are added" do
@stream.at(0).should be_event(:name => "wot", :payload => ["payload#{0}"])
end
it "is safe when only one listener reads through the queue" do
@listeners[0].consume {}
sleep 0.1
@stream.at(0).should be_event(:name => "wot", :payload => ["payload#{0}"])
end
it "cleans up buckets once all listeners have read through the queue" do
consumers = Hash.new {|h,k| h[k] = []}
@listeners.each {|l| l.consume {|e| consumers[l] << e}}
sleep 0.1
@stream.at(0).should be_nil
consumers[@listeners[0]].size.should == 1000
consumers[@listeners[1]].size.should == 0
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment