Skip to content

Instantly share code, notes, and snippets.

@xer0x
Forked from octplane/mongo_pubsub.rb
Created June 25, 2012 21:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xer0x/2991291 to your computer and use it in GitHub Desktop.
Save xer0x/2991291 to your computer and use it in GitHub Desktop.
Simple Pub/Sub system using MongoDB, capped collections and tailable cursors in ruby
require 'rubygems'
require 'mongo'
module MongoPubSub
QUEUES_COLLECTION = 'queues'
class EndSubscriptionException < Exception; end
class Publisher
def initialize(queue_name, mongo_connection)
# Initialize queue collection as a capped collection
if not mongo_connection[QUEUES_COLLECTION].collection_names.include?(queue_name)
mongo_connection[QUEUES_COLLECTION].create_collection(queue_name, { :capped => true, :max => 20 })
end
@publish_collection = mongo_connection[QUEUES_COLLECTION][queue_name]
end
def push(message)
document = { '_id' => (Time.now.to_f * 1000000 ).to_i, 'message' => message }
@publish_collection.save(document)
end
end
class Subscriber
def initialize(queue_name, mongo_connection)
@subscribed_collection = mongo_connection[QUEUES_COLLECTION][queue_name]
# Find latest event in collection, we will ignore all earlier events
@earliest = @subscribed_collection.find_one({}, :sort => [[ '$natural', -1 ]])['_id']
end
def each
tail = Mongo::Cursor.new(@subscribed_collection, :tailable => true, :order => [['$natural', 1]])
while true do
doc = tail.next_document
if doc != nil && doc["_id"].to_i > @earliest
begin
yield doc['message']
rescue EndSubscriptionException
break
end
elsif doc != nil
# This event is too old, ignore it
else
# No event to process. Wait a bit.
sleep(1)
end
end
end
end
end
if __FILE__ == $0
# Simple Pub/Sub test
# Create a subscriber and published on the test queue
# send a message that's ignored and send 4 valid message
# Ends with a special meaning message that stops properly
# the subcriber
m = Mongo::Connection.new('127.0.0.1', 27017)
puts "Starting Publisher"
p = MongoPubSub::Publisher.new('test', m)
# this message should not display
p.push("This message is too old, ignore me.")
puts "Starting Subscriber"
n = Mongo::Connection.new('127.0.0.1', 27017)
s = MongoPubSub::Subscriber.new('test', n)
subcriber = Thread.new do
puts "Now waiting for messages"
s.each do |message|
puts "Subscriber: received \"#{message}\""
# handle exit message correctly
if message == "Exit"
raise MongoPubSub::EndSubscriptionException.new
end
end
puts "Ending Subscriber !"
end
['first event', 'second event', 'third evend','fourth event','Exit'].each do |mes|
puts "Publisher #{mes}"
p.push(mes)
end
subcriber.join
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment