Skip to content

Instantly share code, notes, and snippets.

@tobi
Created May 29, 2010 14:24
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save tobi/418305 to your computer and use it in GitHub Desktop.
Save tobi/418305 to your computer and use it in GitHub Desktop.
STDOUT.sync = true
require 'queue'
start_time = Time.now.to_i
msg = 0
queue = Queue.new("testing")
queue.subscribe do |obj|
msg += 1
seconds = Time.now.to_i - start_time
puts "%.3f ... " % [msg.to_f / seconds]
end
require 'rubygems'
require 'redis'
require 'digest/md5'
#require 'msgpack'
$redis = Redis.new
# Very efficient message queue.
# Performance: ~ 2100 messages per second on i7 iMac
class Queue
def initialize(name)
@queue_name = "queue:#{name}"
end
def clear
$redis.del(@queue_name)
end
def size
$redis.llen(@queue_name)
end
def push(object)
hash = Digest::MD5.hexdigest(object.to_s)
$redis.set("msg:#{hash}", MessagePack.pack(object))
$redis.rpush(@queue_name, hash)
end
def subscribe
loop do
hash = $redis.blpop(@queue_name, 0)[1]
object = $redis.get("msg:#{hash}")
if object
begin
yield MessagePack.unpack(object)
# Done, remove message from redis.
$redis.del("msg:#{hash}")
rescue
# Error, add the message again to the end of the queue
$redis.rpush(@queue_name, hash)
raise
end
end
end
end
end
require 'queue'
queue = Queue.new("testing")
queue.clear
loop do
queue.push 1
puts queue.size
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment