Skip to content

Instantly share code, notes, and snippets.

@xuanyu-h
Created September 30, 2016 03:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xuanyu-h/60ae15fbdb78dfe0b5cfe1dcb5910302 to your computer and use it in GitHub Desktop.
Save xuanyu-h/60ae15fbdb78dfe0b5cfe1dcb5910302 to your computer and use it in GitHub Desktop.
redis queue
# encoding: utf-8
require 'redis'
require 'redis/connection/hiredis'
class RedisQueue
attr_reader :redis, :queue, :buffer, :timeout
attr_accessor :buffer_max
def initialize(queue_name, options = {})
if !queue_name.is_a?(String) || queue_name.empty?
raise ArgumentError, "Queue name must be a non empty string"
end
@queue = "#{queue_name}:source"
@buffer = "#{queue_name}:buffer"
@redis = options.fetch(:redis, Redis.current)
@buffer_max = options.fetch(:conc_num, 8)
end
# Return the buffer max num
#
# @return [int]
alias :conc_num :buffer_max
# Reset the buffer max num
#
# @return [int]
alias :conc_num= :buffer_max=
# Returns the number of queue elements
#
# @return [int]
def length
redis.llen(queue)
end
alias :size :length
# Checks whether the queue is empty
#
# @return [true, false]
def empty?
!(length > 0)
end
alias :empty :empty?
# Access the first element of queue
#
# @return [Object]
def front
redis.lindex(queue, 0)
end
# Access the last element of queue
#
# @return [Object]
def back
redis.lindex(queue, -1)
end
# Access all elements of queue
#
# @return [List of Object]
def list
redis.lrange(queue, 0, -1)
end
# Access all elements of buffer
#
# @return [List of Object]
def runners
redis.lrange(buffer, 0, -1)
end
# Removes the first element of queue and push it to buffer
# if the buffer is fill, return nil
#
# @return [Object]
def pop
redis.rpoplpush(queue, buffer) unless buffer_is_fill?
end
# Inserts element at the end of queue and return the size of queue
#
# @return [int]
def push(obj)
redis.lpush(queue, obj)
end
alias :<< :push
# Remove one element of queue
# return false if the element not exist
#
# @return [true, false]
def remove(obj)
(redis.lrem(queue, 1, obj) != 0) ? true : false
end
alias :delete :remove
# Remove one element of buffer
# return false if the element not exist
#
# @return [true, false]
def commit(obj)
(redis.lrem(buffer, 1, obj) != 0) ? true : false
end
# Clear the queue and buffer
def clear
redis.del(queue)
redis.del(buffer)
end
private
def buffer_is_fill?
redis.llen(buffer) >= buffer_max
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment