Created
June 2, 2011 20:34
-
-
Save rosenfeld/1005245 to your computer and use it in GitHub Desktop.
Messaging API suggestion for Gitorious
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Gitorious::Messaging.default_implementation = :in_memory | |
Gitorious::Messaging.start | |
Gitorious::Messaging.publish_to_queue('queue_name', :something => 1) | |
Gitorious::Messaging.publish_to_queue('queue_name', :something => 2) | |
messages = [] | |
Gitorious::Messaging.subscribe_to_queue('queue_name'){|msg| puts "message: #{msg}"; messages << msg} | |
Gitorious::Messaging.publish_to_queue('queue_name', :something => 3) | |
sleep 1 | |
p messages # will contain all 3 messages |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# lib/gitorious/messaging/in_memory_messaging_implementation.rb | |
require 'thread' | |
module Gitorious | |
module Messaging | |
class InMemoryMessagingImplementation | |
attr_reader :running | |
def publish_to_queue(queue, payload, options = {}) | |
mutexes[queue].synchronize do | |
messages[queue] << payload | |
conditions[queue].signal | |
end | |
end | |
def subscribe_to_queue(queue) | |
Thread.new do | |
mutex = mutexes[queue] | |
msgs = messages[queue] | |
condition = conditions[queue] | |
while @running | |
message = nil | |
mutex.synchronize do | |
condition.wait(mutex) while (@running && !(message = msgs.shift)) | |
end | |
yield(message) if message | |
end | |
end | |
end | |
def start | |
@running = true | |
end | |
def stop | |
mutexes.each {|queue, mutex| mutex.broadcast } | |
@running = false | |
end | |
private | |
def messages | |
@messages ||= Hash.new {|h, k| h[k] = [] } | |
end | |
def mutexes | |
@mutexes ||= Hash.new {|h, k| h[k] = Mutex.new } | |
end | |
def conditions | |
@conditions ||= Hash.new {|h, k| h[k] = ConditionVariable.new } | |
end | |
def listeners | |
@listeners ||= Hash.new {|h, k| h[k] = [] } | |
end | |
end | |
register(:in_memory, InMemoryMessagingImplementation.new) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# lib/gitorious/messaging.rb | |
require 'json' | |
module Gitorious | |
module Messaging | |
class << self | |
def register(implementation_symbol, implementation_class) | |
implementations[implementation_symbol] = implementation_class | |
end | |
def implementations | |
@implementations ||= {} | |
end | |
def default_implementation=(implementation) | |
implementations[:default] = implementation.is_a?(Symbol) ? implementations[implementation] : implementation | |
end | |
def default_implementation | |
implementations[:default] | |
end | |
def publish_to_queue(queue, payload, options = {}) | |
implementation = options.delete(:implementation) || :default | |
throw "You must call start before publishing to a queue" unless implementations[implementation].running | |
implementations[implementation].publish_to_queue queue, payload.to_json | |
end | |
def subscribe_to_queue(queue, options = {}, &handler) | |
raise "You need to pass a block that receives the message as a Hash" unless block_given? && handler.arity == 1 | |
implementation = options.delete(:implementation) || :default | |
throw "You must call start before subscribing to a queue" unless implementations[implementation].running | |
implementations[implementation].subscribe_to_queue(queue) do |message| | |
yield(JSON.parse(message)) | |
end | |
end | |
def start(implementation = :default) | |
implementations[implementation].start | |
end | |
def stop(implementation = :default) | |
implementations[implementation].stop | |
end | |
end | |
module Processor | |
def self.included(base) | |
base.send :include, Singleton | |
base.extend ClassMethods | |
end | |
module ClassMethods | |
def use_implementation(implementation) | |
@implementation = implementation | |
end | |
def subscribe_to_queue(queue) | |
Gitorious::Messaging.subscribe_to_queue(queue, :implementation => implementation) do |message| | |
begin | |
instance.on_message(message) | |
rescue Exception => exception | |
instance.on_error(exception) | |
end | |
end | |
end | |
private | |
def implementations | |
Gitorious::Messaging.implementations | |
end | |
def implementation | |
@implementation ||= :default | |
end | |
end | |
# on_message should be overriden. message is a Hash. | |
def on_message(message) | |
raise "Not implemented on_message method" | |
end | |
def on_error(exception) | |
raise exception | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyProcessor | |
include Gitorious::Messaging::Processor | |
subscribe_to_queue 'queue_name' | |
def on_message(message) | |
messages << message | |
end | |
def messages | |
@messages ||= [] | |
end | |
end | |
# usage example: | |
p MyProcessor.instance.messages # [] | |
Gitorious::Messaging.publish_to_queue('queue_name', :something => 1) | |
Gitorious::Messaging.publish_to_queue('queue_name', :something => 2) | |
sleep 1 | |
p MyProcessor.instance.messages # [{"something"=>1}, {"something"=>2}] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment