Skip to content

Instantly share code, notes, and snippets.

@rosenfeld
Created June 2, 2011 20:34
Show Gist options
  • Save rosenfeld/1005245 to your computer and use it in GitHub Desktop.
Save rosenfeld/1005245 to your computer and use it in GitHub Desktop.
Messaging API suggestion for Gitorious
Gitorious::Messaging.default_implementation = :in_memory
Gitorious::Messaging.start
Gitorious::Messaging.publish_to_queue('queue_name', :something => 1)
Gitorious::Messaging.publish_to_queue('queue_name', :something => 2)
messages = []
Gitorious::Messaging.subscribe_to_queue('queue_name'){|msg| puts "message: #{msg}"; messages << msg}
Gitorious::Messaging.publish_to_queue('queue_name', :something => 3)
sleep 1
p messages # will contain all 3 messages
# lib/gitorious/messaging/in_memory_messaging_implementation.rb
require 'thread'
module Gitorious
module Messaging
class InMemoryMessagingImplementation
attr_reader :running
def publish_to_queue(queue, payload, options = {})
mutexes[queue].synchronize do
messages[queue] << payload
conditions[queue].signal
end
end
def subscribe_to_queue(queue)
Thread.new do
mutex = mutexes[queue]
msgs = messages[queue]
condition = conditions[queue]
while @running
message = nil
mutex.synchronize do
condition.wait(mutex) while (@running && !(message = msgs.shift))
end
yield(message) if message
end
end
end
def start
@running = true
end
def stop
mutexes.each {|queue, mutex| mutex.broadcast }
@running = false
end
private
def messages
@messages ||= Hash.new {|h, k| h[k] = [] }
end
def mutexes
@mutexes ||= Hash.new {|h, k| h[k] = Mutex.new }
end
def conditions
@conditions ||= Hash.new {|h, k| h[k] = ConditionVariable.new }
end
def listeners
@listeners ||= Hash.new {|h, k| h[k] = [] }
end
end
register(:in_memory, InMemoryMessagingImplementation.new)
end
end
# lib/gitorious/messaging.rb
require 'json'
module Gitorious
module Messaging
class << self
def register(implementation_symbol, implementation_class)
implementations[implementation_symbol] = implementation_class
end
def implementations
@implementations ||= {}
end
def default_implementation=(implementation)
implementations[:default] = implementation.is_a?(Symbol) ? implementations[implementation] : implementation
end
def default_implementation
implementations[:default]
end
def publish_to_queue(queue, payload, options = {})
implementation = options.delete(:implementation) || :default
throw "You must call start before publishing to a queue" unless implementations[implementation].running
implementations[implementation].publish_to_queue queue, payload.to_json
end
def subscribe_to_queue(queue, options = {}, &handler)
raise "You need to pass a block that receives the message as a Hash" unless block_given? && handler.arity == 1
implementation = options.delete(:implementation) || :default
throw "You must call start before subscribing to a queue" unless implementations[implementation].running
implementations[implementation].subscribe_to_queue(queue) do |message|
yield(JSON.parse(message))
end
end
def start(implementation = :default)
implementations[implementation].start
end
def stop(implementation = :default)
implementations[implementation].stop
end
end
module Processor
def self.included(base)
base.send :include, Singleton
base.extend ClassMethods
end
module ClassMethods
def use_implementation(implementation)
@implementation = implementation
end
def subscribe_to_queue(queue)
Gitorious::Messaging.subscribe_to_queue(queue, :implementation => implementation) do |message|
begin
instance.on_message(message)
rescue Exception => exception
instance.on_error(exception)
end
end
end
private
def implementations
Gitorious::Messaging.implementations
end
def implementation
@implementation ||= :default
end
end
# on_message should be overriden. message is a Hash.
def on_message(message)
raise "Not implemented on_message method"
end
def on_error(exception)
raise exception
end
end
end
end
class MyProcessor
include Gitorious::Messaging::Processor
subscribe_to_queue 'queue_name'
def on_message(message)
messages << message
end
def messages
@messages ||= []
end
end
# usage example:
p MyProcessor.instance.messages # []
Gitorious::Messaging.publish_to_queue('queue_name', :something => 1)
Gitorious::Messaging.publish_to_queue('queue_name', :something => 2)
sleep 1
p MyProcessor.instance.messages # [{"something"=>1}, {"something"=>2}]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment