Skip to content

Instantly share code, notes, and snippets.

@pope
Forked from ezmobius/gist:262085
Created December 22, 2009 22:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pope/262117 to your computer and use it in GitHub Desktop.
Save pope/262117 to your computer and use it in GitHub Desktop.
require 'rubygems'
require 'redis'
require 'json'
require 'eventmachine'
class RedisLoop
class << self
attr_accessor :queues
def start(opts={}, &blk)
EM.run {
RedisLoop.queues = []
interval = opts.delete(:interval) || 0.5
redis = Redis.new(opts)
blk.call(redis)
EM.add_periodic_timer(interval) {
RedisLoop.queues.each do |q|
count = 0
while (count < 5) && (work = redis.lpop(q[0]))
count += 1
if Symbol === q[2]
q[1].__send__(q[2], work)
else
q[2].call(work)
end
end
end
}
}
end
end
end
class RedisAgent
def subscribe(queue, callback=nil, &blk)
RedisLoop.queues << [queue.to_s, self, (blk ? blk : callback)]
end
end
class MyWorker < RedisAgent
def initialize(redis)
@redis = redis
subscribe 'foo' do |work|
work_on_foo(work)
end
subscribe 'bar' do |work|
work_on_bar(work)
end
end
def work_on_foo(work)
p work
end
def work_on_bar(work)
p work
end
end
RedisLoop.start :host => '127.0.0.1' do |r|
MyWorker.new(r)
end
# in another process with redis loaded
redis.rpush 'foo', 'some work for foo'
redis.rpush 'bar', 'some work for bar'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment