Skip to content

Instantly share code, notes, and snippets.

@evanphx
Created February 20, 2012 22:46
Show Gist options
  • Save evanphx/1872048 to your computer and use it in GitHub Desktop.
Save evanphx/1872048 to your computer and use it in GitHub Desktop.
dir = File.expand_path("..", __FILE__)
require File.join(dir, "redis_protocol")
require 'pp'
class APIFront
GEM_STORE = "http://production.s3.rubygems.org"
def initialize
@redis = nil
@args = {}
if h = ENV['REDIS_HOST']
@args[:host] = h
end
end
def redis
@redis ||= EM::Protocols::Redis.connect(@args).tap do |r|
r.on_error do
puts "error!"
end
end
end
def call(env)
rp = env["PATH_INFO"]
case rp
when %r!^/gems/(.*)\.gem$!
full_name = $1
redis.hget("v:#{full_name}", "name") do |name|
if name
today = Time.now.utc.strftime "%Y-%m-%d"
commands = [
%W!incr downloads!,
%W!incr downloads:rubygem:#{name}!,
%W!incr downloads:version:#{full_name}!,
%W!zincrby downloads:today:#{today} 1 #{full_name}!,
%W!zincrby downloads:all 1 #{full_name}!,
%W!hincrby downloads:version_history:#{full_name} #{today} 1!,
%W!hincrby downloads:rubygem_history:#{name} #{today} 1!
]
redis.call_commands commands
end
end
return [302, {"Location" => "#{GEM_STORE}#{rp}"}, [""]]
when %r!^/quick/Marshal\.4\.8/(.*)\.gemspec\.rz$!
return [302, {"Location" => "http://#{GEM_STORE}#{rp}"}, [""]]
else
return [302, {"Location" => "http://rubygems.org#{rp}"}, [""]]
end
end
end
api = APIFront.new
run api
require 'eventmachine'
module EventMachine
module Protocols
module Redis
include EM::Deferrable
##
# constants
#########################
OK = "OK".freeze
MINUS = "-".freeze
PLUS = "+".freeze
COLON = ":".freeze
DOLLAR = "$".freeze
ASTERISK = "*".freeze
DELIM = "\r\n".freeze
BULK_COMMANDS = {
"set" => true,
"setnx" => true,
"rpush" => true,
"lpush" => true,
"lset" => true,
"lrem" => true,
"sadd" => true,
"srem" => true,
"sismember" => true,
"echo" => true,
"getset" => true,
"smove" => true,
"zadd" => true,
"zincrby" => true,
"zrem" => true,
"zscore" => true,
}
MULTI_BULK_COMMANDS = {
"mset" => true,
"msetnx" => true,
# these aliases aren't in redis gem
"multi_get" => true
}
BOOLEAN_PROCESSOR = lambda{|r| %w(1 OK).include? r.to_s }
REPLY_PROCESSOR = {
"exists" => BOOLEAN_PROCESSOR,
"sismember" => BOOLEAN_PROCESSOR,
"sadd" => BOOLEAN_PROCESSOR,
"srem" => BOOLEAN_PROCESSOR,
"smove" => BOOLEAN_PROCESSOR,
"zadd" => BOOLEAN_PROCESSOR,
"zrem" => BOOLEAN_PROCESSOR,
"move" => BOOLEAN_PROCESSOR,
"setnx" => BOOLEAN_PROCESSOR,
"del" => BOOLEAN_PROCESSOR,
"renamenx" => BOOLEAN_PROCESSOR,
"expire" => BOOLEAN_PROCESSOR,
"select" => BOOLEAN_PROCESSOR, # not in redis gem
"keys" => lambda{|r| r.split(" ")},
"info" => lambda{|r|
info = {}
r.each_line {|kv|
k,v = kv.split(":",2).map{|x| x.chomp}
info[k.to_sym] = v
}
info
}
}
ALIASES = {
"flush_db" => "flushdb",
"flush_all" => "flushall",
"last_save" => "lastsave",
"key?" => "exists",
"delete" => "del",
"randkey" => "randomkey",
"list_length" => "llen",
"push_tail" => "rpush",
"push_head" => "lpush",
"pop_tail" => "rpop",
"pop_head" => "lpop",
"list_set" => "lset",
"list_range" => "lrange",
"list_trim" => "ltrim",
"list_index" => "lindex",
"list_rm" => "lrem",
"set_add" => "sadd",
"set_delete" => "srem",
"set_count" => "scard",
"set_member?" => "sismember",
"set_members" => "smembers",
"set_intersect" => "sinter",
"set_intersect_store" => "sinterstore",
"set_inter_store" => "sinterstore",
"set_union" => "sunion",
"set_union_store" => "sunionstore",
"set_diff" => "sdiff",
"set_diff_store" => "sdiffstore",
"set_move" => "smove",
"set_unless_exists" => "setnx",
"rename_unless_exists" => "renamenx",
"type?" => "type",
"zset_add" => "zadd",
"zset_count" => "zcard",
"zset_range_by_score" => "zrangebyscore",
"zset_reverse_range" => "zrevrange",
"zset_range" => "zrange",
"zset_delete" => "zrem",
"zset_score" => "zscore",
"zset_incr_by" => "zincrby",
"zset_increment_by" => "zincrby",
# these aliases aren't in redis gem
"background_save" => 'bgsave',
"async_save" => 'bgsave',
"members" => 'smembers',
"decrement_by" => "decrby",
"decrement" => "decr",
"increment_by" => "incrby",
"increment" => "incr",
"set_if_nil" => "setnx",
"multi_get" => "mget",
"random_key" => "randomkey",
"random" => "randomkey",
"rename_if_nil" => "renamenx",
"tail_pop" => "rpop",
"pop" => "rpop",
"head_pop" => "lpop",
"shift" => "lpop",
"list_remove" => "lrem",
"index" => "lindex",
"trim" => "ltrim",
"list_range" => "lrange",
"range" => "lrange",
"list_len" => "llen",
"len" => "llen",
"head_push" => "lpush",
"unshift" => "lpush",
"tail_push" => "rpush",
"push" => "rpush",
"add" => "sadd",
"set_remove" => "srem",
"set_size" => "scard",
"member?" => "sismember",
"intersect" => "sinter",
"intersect_and_store" => "sinterstore",
"members" => "smembers",
"exists?" => "exists"
}
DISABLED_COMMANDS = {
"monitor" => true,
"sync" => true
}
def []=(key,value)
set(key,value)
end
def set(key, value, expiry=nil)
call_command([:set, key, value]) do |s|
yield s if block_given?
end
expire(key, expiry) if expiry
end
def sort(key, options={}, &blk)
cmd = ["SORT"]
cmd << key
cmd << "BY #{options[:by]}" if options[:by]
cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get]
cmd << "#{options[:order]}" if options[:order]
cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit]
call_command(cmd, &blk)
end
def incr(key, increment = nil, &blk)
call_command(increment ? ["incrby",key,increment] : ["incr",key], &blk)
end
def decr(key, decrement = nil, &blk)
call_command(decrement ? ["decrby",key,decrement] : ["decr",key], &blk)
end
def select(db, &blk)
@db = db.to_i
call_command(['select', @db], &blk)
end
def auth(password, &blk)
@password = password
call_command(['auth', password], &blk)
end
# Similar to memcache.rb's #get_multi, returns a hash mapping
# keys to values.
def mapped_mget(*keys)
mget(*keys) do |response|
result = {}
response.each do |value|
key = keys.shift
result.merge!(key => value) unless value.nil?
end
yield result if block_given?
end
end
# Ruby defines a now deprecated type method so we need to override it here
# since it will never hit method_missing
def type(key, &blk)
call_command(['type', key], &blk)
end
def quit(&blk)
call_command(['quit'], &blk)
end
def errback(&blk)
@error_callback = blk
end
alias_method :on_error, :errback
def method_missing(*argv, &blk)
call_command(argv, &blk)
end
def call_command(argv, &blk)
callback { raw_call_command(argv, &blk) }
end
def raw_call_command(argv, &blk)
argv[0] = argv[0].to_s unless argv[0].kind_of? String
argv[0] = argv[0].downcase
send_command(argv)
@redis_callbacks << [REPLY_PROCESSOR[argv[0]], blk]
end
def call_commands(argvs, &blk)
callback { raw_call_commands(argvs, &blk) }
end
def raw_call_commands(argvs, &blk)
if argvs.empty? # Shortcut
blk.call []
return
end
argvs.each do |argv|
argv[0] = argv[0].to_s unless argv[0].kind_of? String
send_command argv
end
# FIXME: argvs may contain heterogenous commands, storing all
# REPLY_PROCESSORs may turn out expensive and has been omitted
# for now.
@redis_callbacks << [nil, argvs.length, blk]
end
def send_command(argv)
orig = argv
argv = argv.dup
if MULTI_BULK_COMMANDS[argv.flatten[0].to_s]
# TODO improve this code
argvp = argv.flatten
values = argvp.pop.to_a.flatten
argvp = values.unshift(argvp[0])
command = ["*#{argvp.size}"]
argvp.each do |v|
v = v.to_s
command << "$#{get_size(v)}"
command << v
end
command = command.map {|cmd| "#{cmd}\r\n"}.join
else
command = ""
bulk = nil
argv[0] = argv[0].to_s.downcase
argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]]
raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
command << "*#{argv.size}\r\n"
# if BULK_COMMANDS[argv[0]] and argv.length > 1
# bulk = argv[-1].to_s
# argv[-1] = get_size(bulk)
# end
argv.each do |a|
command << "$#{a.size}\r\n#{a}\r\n"
end
end
# @logger.debug { "*** sending: #{command}" } if @logger
p orig
puts "*** sending: #{command.inspect}"
send_data command
end
##
# errors
#########################
class ParserError < StandardError; end
class ProtocolError < StandardError; end
class RedisError < StandardError
attr_accessor :code
end
##
# em hooks
#########################
def self.connect(*args)
case args.length
when 0
options = {}
when 1
arg = args.shift
case arg
when Hash then options = arg
when String then options = {:host => arg}
else raise ArgumentError, 'first argument must be Hash or String'
end
when 2
options = {:host => args[1], :port => args[2]}
else
raise ArgumentError, "wrong number of arguments (#{args.length} for 1)"
end
options[:host] ||= '127.0.0.1'
options[:port] = (options[:port] || 6379).to_i
p options
EM.connect options[:host], options[:port], self, options
end
def initialize(options = {})
@host = options[:host]
@port = options[:port]
@db = (options[:db] || 0).to_i
@password = options[:password]
@logger = options[:logger]
@error_callback = lambda do |code|
err = RedisError.new
err.code = code
raise err, "Redis server returned error code: #{code}"
end
@values = []
# These commands should be first
auth_and_select_db
end
def auth_and_select_db
call_command(["auth", @password]) if @password
call_command(["select", @db]) unless @db == 0
end
private :auth_and_select_db
def connection_completed
@logger.debug { "Connected to #{@host}:#{@port}" } if @logger
@redis_callbacks = []
@multibulk_n = false
@reconnecting = false
@connected = true
succeed
end
# 19Feb09 Switched to a custom parser, LineText2 is recursive and can cause
# stack overflows when there is too much data.
# include EM::P::LineText2
def receive_data(data)
p :recv => data
(@buffer ||= '') << data
while index = @buffer.index(DELIM)
begin
line = @buffer.slice!(0, index+2)
process_cmd line
rescue ParserError
@buffer[0...0] = line
break
end
end
end
def process_cmd(line)
puts "*** processing #{line}"
# first character of buffer will always be the response type
reply_type = line[0, 1]
reply_args = line.slice(1..-3) # remove type character and \r\n
case reply_type
#e.g. -MISSING
when MINUS
# Missing, dispatch empty response
dispatch_response(nil)
# e.g. +OK
when PLUS
dispatch_response(reply_args)
# e.g. $3\r\nabc\r\n
# 'bulk' is more complex because it could be part of multi-bulk
when DOLLAR
data_len = Integer(reply_args)
if data_len == -1 # expect no data; return nil
dispatch_response(nil)
elsif @buffer.size >= data_len + 2 # buffer is full of expected data
dispatch_response(@buffer.slice!(0, data_len))
@buffer.slice!(0,2) # tossing \r\n
else # buffer isn't full or nil
# TODO: don't control execution with exceptions
raise ParserError
end
#e.g. :8
when COLON
dispatch_response(Integer(reply_args))
#e.g. *2\r\n$1\r\na\r\n$1\r\nb\r\n
when ASTERISK
multibulk_count = Integer(reply_args)
if multibulk_count == -1
dispatch_response([])
else
start_multibulk(multibulk_count)
end
# Whu?
else
# TODO: get rid of this exception
raise ProtocolError, "reply type not recognized: #{line.strip}"
end
end
def dispatch_response(value)
if @multibulk_n
@multibulk_values << value
@multibulk_n -= 1
if @multibulk_n == 0
value = @multibulk_values
@multibulk_n = false
else
return
end
end
callback = @redis_callbacks.shift
if callback.kind_of?(Array) && callback.length == 2
processor, blk = callback
value = processor.call(value) if processor
blk.call(value) if blk
elsif callback.kind_of?(Array) && callback.length == 3
processor, pipeline_count, blk = callback
value = processor.call(value) if processor
@values << value
if pipeline_count > 1
@redis_callbacks.unshift [processor, pipeline_count - 1, blk]
else
blk.call(@values) if blk
@values = []
end
end
end
def start_multibulk(multibulk_count)
@multibulk_n = multibulk_count
@multibulk_values = []
end
def unbind
@logger.debug { "Disconnected" } if @logger
if @connected || @reconnecting
EM.add_timer(1) do
@logger.debug { "Reconnecting to #{@host}:#{@port}" } if @logger
reconnect @host, @port
auth_and_select_db
end
@connected = false
@reconnecting = true
@deferred_status = nil
else
# TODO: get rid of this exception
raise 'Unable to connect to redis server'
end
end
private
def get_size(string)
string.respond_to?(:bytesize) ? string.bytesize : string.size
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment