Skip to content

@octplane /mongo_pubsub.rb
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Simple Pub/Sub system using MongoDB, capped collections and tailable cursors in ruby
require 'rubygems'
require 'mongo'
module MongoPubSub
QUEUES_COLLECTION = 'queues'
class EndSubscriptionException < Exception; end
class Publisher
def initialize(queue_name, mongo_connection)
# Initialize queue collection as a capped collection
if not mongo_connection[QUEUES_COLLECTION].collection_names.include?(queue_name)
mongo_connection[QUEUES_COLLECTION].create_collection(queue_name, { :capped => true, :max => 20 })
end
@publish_collection = mongo_connection[QUEUES_COLLECTION][queue_name]
end
def push(message)
document = { '_id' => (Time.now.to_f * 1000000 ).to_i, 'message' => message }
@publish_collection.save(document)
end
end
class Subscriber
def initialize(queue_name, mongo_connection)
@subscribed_collection = mongo_connection[QUEUES_COLLECTION][queue_name]
# Find latest event in collection, we will ignore all earlier events
@earliest = @subscribed_collection.find_one({}, :sort => [[ '$natural', -1 ]])['_id']
end
def each
tail = Mongo::Cursor.new(@subscribed_collection, :tailable => true, :order => [['$natural', 1]])
while true do
doc = tail.next_document
if doc != nil && doc["_id"].to_i > @earliest
begin
yield doc['message']
rescue EndSubscriptionException
break
end
elsif doc != nil
# This event is too old, ignore it
else
# No event to process. Wait a bit.
sleep(1)
end
end
end
end
end
if __FILE__ == $0
# Simple Pub/Sub test
# Create a subscriber and published on the test queue
# send a message that's ignored and send 4 valid message
# Ends with a special meaning message that stops properly
# the subcriber
m = Mongo::Connection.new('127.0.0.1', 27017)
puts "Starting Publisher"
p = MongoPubSub::Publisher.new('test', m)
# this message should not display
p.push("This message is too old, ignore me.")
puts "Starting Subscriber"
n = Mongo::Connection.new('127.0.0.1', 27017)
s = MongoPubSub::Subscriber.new('test', n)
subcriber = Thread.new do
puts "Now waiting for messages"
s.each do |message|
puts "Subscriber: received \"#{message}\""
# handle exit message correctly
if message == "Exit"
raise MongoPubSub::EndSubscriptionException.new
end
end
puts "Ending Subscriber !"
end
['first event', 'second event', 'third evend','fourth event','Exit'].each do |mes|
puts "Publisher #{mes}"
p.push(mes)
end
subcriber.join
end
@paulwalker

Rather than sleeping for a second after receiving a nil response, you can use the await_data option:

tail.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA)

When there are no additional documents in the cursor, mongod will wait for a few seconds for new documents before sending a response.

@bimargulies

How does this recover from pushing too many things into the queue?

@rafaelsales

Why don't you make this a gem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.