Skip to content

Instantly share code, notes, and snippets.

@armstrjare
Created March 4, 2011 01:42
Show Gist options
  • Save armstrjare/854007 to your computer and use it in GitHub Desktop.
Save armstrjare/854007 to your computer and use it in GitHub Desktop.
Buffer redis pipeline calls, which flush on definable intervals.
# Helper class for buffering pipelined redis calls
#
# Options:
# - flush_at : The number of commands to keep in the buffer before flushing.
# A value of 1 will flush after every call.
# A value of 0 will *never* flush (must call flush! explicitly)
#
# Usage:
# redis_pipeline = RedisPipeline.new(redis_client, :flush_at => 50)
#
# redis_pipeline.hset(some_redis_key, some_field, some_value)
# redis_pipeline.sadd(some_redis_key, some_value)
# ... 1000 times
# # will auto flush to redis
# ... some more times ...
# redis_pipeline.flush! # to make sure everything in the buffer is flushed.
#
class RedisPipelineBuffer
@@default_options = { :flush_at => 1000 }
cattr_accessor :default_options
@redis = nil # Redis client instance to send commands to
@queue = [] # Queue of unflushed commands
def initialize(redis, options = {})
@redis = redis
@options = @@default_options.merge(options)
end
# Flush the buffer
# Returns: the number of commands sent through the pipeline
def flush!
len = @queue.length
@redis.pipeline do
@queue.each { |cmd| @redis.send(*cmd) }
end unless len.zero?
@queue.clear
len
end
# Catch any command sent to the client, and dispatch it to the the Redis client only when
# reaching the speficied buffer limit
def method_missing(*args)
@queue << args
flush_if_needed!
end
private
def flush_if_needed!
if @options[:flush_at] && @options[:flush_at] != 0
flush! if @queue.length >= @options[:flush_at]
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment