Created
November 5, 2013 15:35
-
-
Save mateuszdw/7320912 to your computer and use it in GitHub Desktop.
em-synchrony/em-hiredis synchrony patch including em-hiredis commit https://github.com/mloughran/em-hiredis/commit/4b7ec0273790cfe491d686b97894e04297d07c64. Gist not fully tested.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
begin | |
require 'em-hiredis' | |
rescue LoadError => error | |
raise 'Missing EM-Synchrony dependency: gem install em-hiredis' | |
end | |
module EventMachine | |
module Hiredis | |
def self.connect(uri = nil) | |
client = setup(uri) | |
EM::Synchrony.sync client.connect | |
client | |
end | |
class BaseClient | |
def self.connect(host = 'localhost', port = 6379) | |
conn = new(host, port) | |
EM::Synchrony.sync conn.connect | |
conn | |
end | |
def connect | |
@auto_reconnect = true | |
@connection = EM.connect(@host, @port, Connection, @host, @port) | |
@connection.on(:closed) do | |
if @connected | |
@defs.each { |d| d.fail(Error.new("Redis disconnected")) } | |
@defs = [] | |
@deferred_status = nil | |
@connected = false | |
if @auto_reconnect | |
# Next tick avoids reconnecting after for example EM.stop | |
EM.next_tick { reconnect } | |
end | |
emit(:disconnected) | |
EM::Hiredis.logger.info("#{@connection} Disconnected") | |
else | |
if @auto_reconnect | |
@reconnect_failed_count += 1 | |
@reconnect_timer = EM.add_timer(EM::Hiredis.reconnect_timeout) { | |
@reconnect_timer = nil | |
reconnect | |
} | |
emit(:reconnect_failed, @reconnect_failed_count) | |
EM::Hiredis.logger.info("#{@connection} Reconnect failed") | |
if @reconnect_failed_count >= 4 | |
emit(:failed) | |
self.fail(Error.new("Could not connect after 4 attempts")) | |
end | |
end | |
end | |
end | |
@connection.on(:connected) do | |
Fiber.new do | |
@connected = true | |
@reconnect_failed_count = 0 | |
@failed = false | |
select(@db) unless @db == 0 | |
auth(@password) if @password | |
@command_queue.each do |df, command, args| | |
@connection.send_command(command, args) | |
@defs.push(df) | |
end | |
@command_queue = [] | |
emit(:connected) | |
EM::Hiredis.logger.info("#{@connection} Connected") | |
succeed | |
if @reconnecting | |
@reconnecting = false | |
emit(:reconnected) | |
end | |
end.resume | |
end | |
@connection.on(:message) do |reply| | |
if RuntimeError === reply | |
raise "Replies out of sync: #{reply.inspect}" if @defs.empty? | |
deferred = @defs.shift | |
error = RedisError.new(reply.message) | |
error.redis_error = reply | |
deferred.fail(error) if deferred | |
else | |
handle_reply(reply) | |
end | |
end | |
@connected = false | |
@reconnecting = false | |
return self | |
end | |
alias :old_method_missing :method_missing | |
def method_missing(sym, *args) | |
EM::Synchrony.sync old_method_missing(sym, *args) | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment