public
Last active

Simple Pub/Sub system using MongoDB, capped collections and tailable cursors in ruby

  • Download Gist
mongo_pubsub.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
require 'rubygems'
require 'mongo'
 
module MongoPubSub
QUEUES_COLLECTION = 'queues'
class EndSubscriptionException < Exception; end
class Publisher
def initialize(queue_name, mongo_connection)
# Initialize queue collection as a capped collection
if not mongo_connection[QUEUES_COLLECTION].collection_names.include?(queue_name)
mongo_connection[QUEUES_COLLECTION].create_collection(queue_name, { :capped => true, :max => 20 })
end
@publish_collection = mongo_connection[QUEUES_COLLECTION][queue_name]
end
 
def push(message)
document = { '_id' => (Time.now.to_f * 1000000 ).to_i, 'message' => message }
@publish_collection.save(document)
end
end
class Subscriber
def initialize(queue_name, mongo_connection)
@subscribed_collection = mongo_connection[QUEUES_COLLECTION][queue_name]
# Find latest event in collection, we will ignore all earlier events
@earliest = @subscribed_collection.find_one({}, :sort => [[ '$natural', -1 ]])['_id']
end
def each
tail = Mongo::Cursor.new(@subscribed_collection, :tailable => true, :order => [['$natural', 1]])
while true do
doc = tail.next_document
if doc != nil && doc["_id"].to_i > @earliest
begin
yield doc['message']
rescue EndSubscriptionException
break
end
elsif doc != nil
# This event is too old, ignore it
else
# No event to process. Wait a bit.
sleep(1)
end
end
end
end
end
 
 
if __FILE__ == $0
# Simple Pub/Sub test
# Create a subscriber and published on the test queue
# send a message that's ignored and send 4 valid message
# Ends with a special meaning message that stops properly
# the subcriber
 
m = Mongo::Connection.new('127.0.0.1', 27017)
puts "Starting Publisher"
p = MongoPubSub::Publisher.new('test', m)
 
# this message should not display
p.push("This message is too old, ignore me.")
 
puts "Starting Subscriber"
n = Mongo::Connection.new('127.0.0.1', 27017)
s = MongoPubSub::Subscriber.new('test', n)
 
 
subcriber = Thread.new do
puts "Now waiting for messages"
s.each do |message|
puts "Subscriber: received \"#{message}\""
# handle exit message correctly
if message == "Exit"
raise MongoPubSub::EndSubscriptionException.new
end
end
puts "Ending Subscriber !"
end
 
['first event', 'second event', 'third evend','fourth event','Exit'].each do |mes|
puts "Publisher #{mes}"
p.push(mes)
end
 
subcriber.join
end

Rather than sleeping for a second after receiving a nil response, you can use the await_data option:

tail.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA)

When there are no additional documents in the cursor, mongod will wait for a few seconds for new documents before sending a response.

How does this recover from pushing too many things into the queue?

Why don't you make this a gem?

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.