Skip to content

Instantly share code, notes, and snippets.

@djpate
Created October 16, 2015 02:56
Show Gist options
  • Save djpate/3f96a64f49bd278258d5 to your computer and use it in GitHub Desktop.
Save djpate/3f96a64f49bd278258d5 to your computer and use it in GitHub Desktop.
require 'benchmark'
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
class Fanout
def initialize
@subscribers = []
@listeners_for = {}
@mutex = Mutex.new
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscriber.new(pattern, block)
@mutex.synchronize do
@subscribers << subscriber
@listeners_for.clear
end
subscriber
end
def unsubscribe(subscriber)
@mutex.synchronize do
@subscribers.reject! {|s| s.matches?(subscriber)}
@listeners_for.clear
end
end
def publish(name, *args)
listeners_for(name).each { |s| s.publish(name, *args) }
end
def listeners_for(name)
@mutex.synchronize do
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
end
end
def listening?(name)
listeners_for(name).any?
end
# This is a sync queue, so there is no waiting.
def wait
end
class Subscriber #:nodoc:
def initialize(pattern, delegate)
@pattern = pattern
@delegate = delegate
end
def publish(message, *args)
@delegate.call(message, *args)
end
def subscribed_to?(name)
!@pattern || @pattern === name.to_s
end
def matches?(subscriber_or_name)
self === subscriber_or_name ||
@pattern && @pattern === subscriber_or_name
end
end
end
end
end
puts Benchmark.measure {
fan = ActiveSupport::Notifications::Fanout.new
sub = []
pub = []
100_000.times do
sub << Thread.new do
s = fan.subscribe('foo') do
#noop
end
fan.unsubscribe(s)
end
pub << Thread.new do
fan.publish('foo', 'bar')
end
end
sub.each(&:join)
pub.each(&:join)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment