Skip to content

Instantly share code, notes, and snippets.

@ezmobius
Created April 22, 2011 21:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ezmobius/937648 to your computer and use it in GitHub Desktop.
Save ezmobius/937648 to your computer and use it in GitHub Desktop.
require 'redis'
require 'json'
module RedActor
class Actor
class << self
def mailbox(queue, &blk)
RedActor.queues ||= {}
RedActor.queues[queue.to_s] = [self, blk]
end
end
def mailbox(queue, &blk)
self.class.mailbox(queue, &blk)
end
def initialize(redis)
@redis = redis
end
def publish(msg)
@redis.rpush('return', msg.to_json)
end
end
class << self
attr_accessor :queues, :redis
def get_queues(timeout=10)
# randomize order of queues each time we get_queues
# so that blpop gets keys in different order each call,
# this way we don't bogart one queue and starve the rest
RedActor.queues.keys.sort_by { rand } << timeout
end
def run(opts={})
srand
opts[:timeout] ||= 15
redis = Redis.new(opts)
RedActor.redis = redis
loop do
queue, msg = redis.blpop(*get_queues(opts[:timeout].to_i - 5))
if queue && msg
klass, blk = RedActor.queues[queue]
actor = klass.new(redis)
begin
if blk
blk.call(actor, JSON.parse(msg))
else
actor.__send__("receive_#{queue}", JSON.parse(msg))
end
rescue => e
p [:ERROR, e.message]
puts e.backtrace.join("\n")
end
end
end
end
end
end
require 'redis'
require 'json'
module RedActor
class Actor
class << self
def mailbox(queue, &blk)
RedActor.queues ||= {}
RedActor.queues[queue.to_s] = [self, blk]
end
end
def mailbox(queue, &blk)
self.class.mailbox(queue, &blk)
end
def initialize(redis)
@redis = redis
end
def publish(msg)
@redis.rpush('return', msg.to_json)
end
end
class << self
attr_accessor :queues, :redis
def get_queues(timeout=10)
# randomize order of queues each time we get_queues
# so that blpop gets keys in different order each call,
# this way we don't bogart one queue and starve the rest
RedActor.queues.keys.sort_by { rand } << timeout
end
def run(opts={})
srand
opts[:timeout] ||= 15
redis = Redis.new(opts)
RedActor.redis = redis
loop do
queue, msg = redis.blpop(*get_queues(opts[:timeout].to_i - 5))
if queue && msg
klass, blk = RedActor.queues[queue]
actor = klass.new(redis)
begin
if blk
blk.call(actor, JSON.parse(msg))
else
actor.__send__("receive_#{queue}", JSON.parse(msg))
end
rescue => e
p [:ERROR, e.message]
puts e.backtrace.join("\n")
end
end
end
end
end
end
class FooActor < Redactor
attr_accessor :redis
mailbox :email
def receive_email(msg)
#msg is a hash do whatever you want here
# then publish results if needed
publish :email_id => 42, :status => :sent
end
mailbox :api_call
def receieve_api_call(msg)
# make an external api call with the contents of the msg hash
end
end
Redactor.run(:host => 'localhost', :port => 4242)
class FooActor < Redactor
attr_accessor :redis
mailbox :email
def receive_email(msg)
#msg is a hash do whatever you want here
# then publish results if needed
publish :email_id => 42, :status => :sent
end
mailbox :api_call
def receieve_api_call(msg)
# make an external api call with the contents of the msg hash
end
end
Redactor.run(:host => 'localhost', :port => 4242)
@javierguerragiraldez
Copy link

check your language tagging... some say 'Ruby', most say 'Text', but all seem to be Python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment