Skip to content

Instantly share code, notes, and snippets.

@robertjpayne
Created December 29, 2013 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robertjpayne/8171587 to your computer and use it in GitHub Desktop.
Save robertjpayne/8171587 to your computer and use it in GitHub Desktop.
Make redis-rb play nice with Celluloid
require 'celluloid'
require 'celluloid/io'
require 'celluloid/redis'
class CelluloidRedisClient < ::Redis::Client
def initialize(opts={})
@terminated = false
super
end
def process(*args)
unless terminated?
puts "PROCESSING #{args}"
super
end
end
def terminated?
@terminated
end
def terminate
begin
process([[:quit]])
@terminated = true
rescue ::Redis::ConnectionError
ensure
@terminated = true
disconnect
end
end
end
class CelluloidRedis < ::Redis
def initialize(opts={})
opts[:driver] = :celluloid
@original_client = @client = CelluloidRedisClient.new(opts)
end
def synchronize
yield @client if block_given?
end
def terminate
@original_client.terminate unless @original_client.terminated?
end
end
class Actor
include Celluloid::IO
finalizer :shutdown
def initialize
@redis = ::CelluloidRedis.new
end
def subscribe(channel)
puts "subscribe: #{channel}"
@redis.subscribe(channel) do | on|
on.message do |channel, msg|
puts "#{@redis} => #{channel}: #{msg}"
end
end
end
def publish(channel, message)
puts "publish: #{channel} #{message}"
@redis.publish(channel, message)
end
def shutdown
puts "REDIS: #{@redis}"
@redis.terminate if @redis
puts "TERMINATED"
@redis = nil
end
end
a1 = Actor.new
a1.async.subscribe('backd/ping')
# actor.async.subscribe('backd/pong')
sleep 2
a2 = Actor.new
a2.async.publish('backd/ping', 'Hello World!')
sleep 2
a1.terminate
sleep
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment