Skip to content

Instantly share code, notes, and snippets.

@ihoka
Forked from ezmobius/gist:262085
Created December 24, 2009 11:33
Show Gist options
  • Save ihoka/263155 to your computer and use it in GitHub Desktop.
Save ihoka/263155 to your computer and use it in GitHub Desktop.
require 'rubygems'
require 'redis'
require 'json'
require 'eventmachine'
class RedisLoop
class << self
attr_accessor :queues
def start(opts={}, &blk)
EM.run {
RedisLoop.queues = []
interval = opts.delete(:interval) || 0.5
redis = Redis.new(opts)
blk.call(redis)
EM.add_periodic_timer(interval) {
RedisLoop.queues.each do |q|
count = 0
while (count < 5) && (work = redis.lpop(q[0]))
count += 1
if Symbol === q[2]
q[1].__send__(q[2], work)
else
q[2].call(work)
end
end
end
}
}
end
end
end
class RedisAgent
def subscribe(queue, callback=nil, &blk)
RedisLoop.queues << [queue.to_s, self, (blk ? blk : callback)]
end
end
class MyWorker < RedisAgent
def initialize(redis)
@redis = redis
subscribe 'foo' do |work|
work_on_foo(work)
end
subscribe 'bar' do |work|
work_on_bar(work)
end
end
def work_on_foo(work)
p work
end
def work_on_bar(work)
p work
end
end
RedisLoop.start :host => '127.0.0.1' do |r|
MyWorker.new(r)
end
# in another process with redis loaded
redis.rpush 'foo', 'some work for foo'
redis.rpush 'bar', 'some work for bar'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment